diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-05 14:43:14 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-05 14:43:14 +0000 |
commit | 93bd9b2405e5c8d5c4493d621297cc8765785f28 (patch) | |
tree | 2263f8360596fa66dfc767acdbe74bafa0eda59f | |
parent | c24eccc88801b77b06842aa0686b6582040630a4 (diff) | |
download | qpid-python-93bd9b2405e5c8d5c4493d621297cc8765785f28.tar.gz |
Revision: 503646
Author: rgreig
Date: 11:28:57, 05 February 2007
Message:
(Submitted by Rupert Smith)
This local repository is no longer needed. JUnit-Toolkit snapshot repository is now hosted on sourceforge: http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/. A release is also in progress to the central maven repository.
----
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo
Revision: 503637
Author: rgreig
Date: 11:17:08, 05 February 2007
Message:
(Submitted by Rupert Smith)
Junit-toolkit has now fully migrated onto sourceforge. Snapshot repository location updated.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/pom.xml
Revision: 503609
Author: ritchiem
Date: 09:49:59, 05 February 2007
Message:
Update to performance testing to allow the use of shared destinations. This allows topics to have multiple consumers and the total message counts updated correctly.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Revision: 503604
Author: rgreig
Date: 09:40:04, 05 February 2007
Message:
QPID-326 : Patch supplied by Rob Godfrey - add oldest message on queue notification, and log notifications in log file
----
Modified : /incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
Revision: 503593
Author: ritchiem
Date: 08:58:30, 05 February 2007
Message:
Fixed bug in stop(). If a connection is opened not start()ed then closed a NullPointerException will be thrown as the Dispatcher has not been created.
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Revision: 502655
Author: rgreig
Date: 16:59:14, 02 February 2007
Message:
(Submitted by Rupert Smith) Options moved to top of contructor. Were at bottom and not being used!
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Revision: 502627
Author: rgreig
Date: 15:31:30, 02 February 2007
Message:
(Submitted by Rupert Smith)
Fixed problem with losing message results. Was not passing in self generated message correlation id in the async test, to match up replies with.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Revision: 502620
Author: rgreig
Date: 15:09:08, 02 February 2007
Message:
(Submitted by Rupert Smith)
Perftests improved with better timeout handling. Shared/unique destinations to ping now an option.
TestRunner now runs all per-thread setups, synchs all threads, then runs tests, synchas all threads, then runs tear downs.
----
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.md5
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.sha1
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.md5
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.sha1
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
Revision: 502610
Author: bhupendrab
Date: 14:26:32, 02 February 2007
Message:
QPID-84
tests for FSContextFactory deleted.fscontext.jar is not part of apache svn.
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
Deleted : /incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest
Revision: 502576
Author: ritchiem
Date: 11:13:13, 02 February 2007
Message:
QPID-343 Performance test suite doesn't output missing message count on failure.
Updated PingAsyncTestPerf to output missing messsage count.
Updated PingPongProducer so it doesn't use AMQShortStringx.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@503703 13f79535-47bb-0310-9956-ffa450edef68
57 files changed, 675 insertions, 2130 deletions
diff --git a/qpid/java/AppliedPatches.txt b/qpid/java/AppliedPatches.txt index d60c8b1c75..6967a594a9 100644 --- a/qpid/java/AppliedPatches.txt +++ b/qpid/java/AppliedPatches.txt @@ -3,7 +3,7 @@ Update includes local fix for AMQQueueMBeanTest.getQueueDepth resolved by r50100 - Hard coded to return 0 -Latest Revision- -502576.499979 +503646,502576.499979 -Not Done- 502610 QPID-84 UPdate PropertiesFileInitialContextFactoryTest Remove referencetest @@ -18,7 +18,6 @@ Update includes local fix for AMQQueueMBeanTest.getQueueDepth resolved by r50100 501008 QPID-324 change sending of destination to byte representing T/Q/Other 501007 QPID-322 Test may habg isntead of fail of messages doesn't get through 501005 QPID-324 change sending of destination to byte representing T/Q/Other -501003 QPID-320 Imporve performance by remembering protocol version 500310 Patch to TransactedTest to clean up messages 500264 Change to Transacted Test 499874 QPID-319 management console update (QPID-50) @@ -64,8 +63,19 @@ Update includes local fix for AMQQueueMBeanTest.getQueueDepth resolved by r50100 -PARTIAL- 494650 QPID-268 Improvements to performance of generated code +501003 QPID-320 Imporve performance by remembering protocol version + * DeliveryManager Queue data size counting. -Done- +503646 local repository removed +503637 Junit-toolkit update +503609 Updated to performacen test to allow the use of shared destinations. +503604 QPID-326 add oldest Message on queue Notification and log notification in log file +503593 Fixed bug in session . stop() where it would NPE on close if not start()ed +502655 Moved option sets to top of constructor +502627 Fixed problem with losing message results. +502620 Improved timeout handling +502576 QPID-343 Performance test suite doesn't output missing message count on failure 502627 Fixed problem with losing message results 502620 Performance test improvement with better timeout handing 502576 QPID-343 performacne test suite doesn't output missing message count @@ -89,7 +99,6 @@ Update includes local fix for AMQQueueMBeanTest.getQueueDepth resolved by r50100 501010 QPID-322 Message reference count not incremented when added to unackMap. 501004 QPID-320 Simplify MessageListener setup 500284 Updated script details and added guard for traffic light being null -- 499979 QPID-315 Test classes to reproduce missing correlation id 499975 QPID-315 Moved message converstion to MessageConverter. 499781 Fixed race that caused duplicate date in log file diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml index a14ac8459b..28a572eac9 100644 --- a/qpid/java/broker/etc/log4j.xml +++ b/qpid/java/broker/etc/log4j.xml @@ -42,6 +42,10 @@ <priority value="info"/> </category> + <category name="org.apache.qpid.server.queue.AMQQueueMBean"> + <priority value="info"/> + </category> + <category name="org.apache.qpid"> <priority value="warn"/> </category> diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml index de6a8c0682..2a573535de 100644 --- a/qpid/java/broker/etc/virtualhosts.xml +++ b/qpid/java/broker/etc/virtualhosts.xml @@ -21,8 +21,79 @@ --> <virtualhosts> <virtualhost> - <path>/development</path> - <bind>direct://amq.direct//queue</bind> - <bind>direct://amq.direct//ping</bind> + <name>localhost</name> + + <localhost> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> + <queue> + <name>queue</name> + <queue> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </queue> + </queue> + <queue> + <name>ping</name> + <ping> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </ping> + </queue> + </localhost> + </virtualhost> + <virtualhost> + <name>development</name> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> + <development> + <queue> + <name>queue</name> + <queue> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </queue> + </queue> + <queue> + <name>ping</name> + <ping> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </ping> + </queue> + </development> + </virtualhost> + <virtualhost> + <name>test</name> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> + <test> + <queue> + <name>queue</name> + <queue> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </queue> + </queue> + <queue> + <name>ping</name> + <ping> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </ping> + </queue> + </test> </virtualhost> </virtualhosts> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 9ecbf3d31a..a433351509 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -32,9 +32,13 @@ import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.CompositeConfiguration; import java.util.Collection; +import java.util.List; +import java.util.Collections; public class VirtualHostConfiguration { @@ -42,11 +46,7 @@ public class VirtualHostConfiguration XMLConfiguration _config; - private static final String XML_VIRTUALHOST = "virtualhost"; - private static final String XML_PATH = "path"; - private static final String XML_BIND = "bind"; - private static final String XML_VIRTUALHOST_PATH = "virtualhost.path"; - private static final String XML_VIRTUALHOST_BIND = "virtualhost.bind"; + private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost."; public VirtualHostConfiguration(String configFile) throws ConfigurationException @@ -55,135 +55,58 @@ public class VirtualHostConfiguration _config = new XMLConfiguration(configFile); - if (_config.getProperty(XML_VIRTUALHOST_PATH) == null) - { - throw new ConfigurationException( - "Virtualhost Configuration document does not contain a valid virtualhost."); - } } - public void performBindings() throws AMQException, ConfigurationException, URLSyntaxException - { - Object prop = _config.getProperty(XML_VIRTUALHOST_PATH); - if (prop instanceof Collection) - { - _logger.debug("Number of VirtualHosts: " + ((Collection) prop).size()); - int virtualhosts = ((Collection) prop).size(); - for (int vhost = 0; vhost < virtualhosts; vhost++) - { - loadVirtualHost(vhost); - } - } - else - { - loadVirtualHost(-1); - } - } - - private void loadVirtualHost(int index) throws AMQException, ConfigurationException, URLSyntaxException + private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException { - String path = XML_VIRTUALHOST; + _logger.debug("Loding configuration for virtaulhost: "+virtualHostName); - if (index != -1) + if(virtualHostName == null) { - path = path + "(" + index + ")"; - } - - Object prop = _config.getProperty(path + "." + XML_PATH); - - if (prop == null) - { - prop = _config.getProperty(path + "." + XML_BIND); - String error = "Virtual Host not defined for binding"; - - if (prop != null) - { - if (prop instanceof Collection) - { - error += "s"; - } - - error += ": " + prop; - } - - throw new ConfigurationException(error); + throw new ConfigurationException("Unknown virtual host: " + virtualHostName); } - _logger.info("VirtualHost:'" + prop + "'"); + List queueNames = configuration.getList("queue.name"); - prop = _config.getProperty(path + "." + XML_BIND); - if (prop instanceof Collection) + for(Object queueNameObj : queueNames) { - int bindings = ((Collection) prop).size(); - _logger.debug("Number of Bindings: " + bindings); - for (int dest = 0; dest < bindings; dest++) - { - loadBinding(path, dest); - } + String queueName = String.valueOf(queueNameObj); + configureQueue(queueName, configuration); } - else - { - loadBinding(path, -1); - } - } - private void loadBinding(String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException - { - String path = rootpath + "." + XML_BIND; - if (index != -1) - { - path = path + "(" + index + ")"; - } - - String bindingString = _config.getString(path); - - AMQBindingURL binding = new AMQBindingURL(bindingString); - - _logger.debug("Loaded Binding:" + binding); - - try - { - bind(binding); - } - catch (AMQException amqe) - { - _logger.info("Unable to bind url: " + binding); - throw amqe; - } } - private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException + private void configureQueue(String queueName, Configuration configuration) throws AMQException, ConfigurationException { + CompositeConfiguration queueConfiguration = new CompositeConfiguration(); - String queueName = binding.getQueueName(); - - // This will occur if the URL is a Topic - if (queueName == null) - { - //todo register valid topic - ///queueName = binding.getDestinationName(); - throw new AMQException("Topics cannot be bound. TODO Register valid topic"); - } + queueConfiguration.addConfiguration(configuration.subset("queue."+ queueName)); + queueConfiguration.addConfiguration(configuration); - //Get references to Broker Registries QueueRegistry queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry(); MessageStore messageStore = ApplicationRegistry.getInstance().getMessageStore(); ExchangeRegistry exchangeRegistry = ApplicationRegistry.getInstance().getExchangeRegistry(); + AMQQueue queue; + synchronized (queueRegistry) { - AMQQueue queue = queueRegistry.getQueue(queueName); + queue = queueRegistry.getQueue(queueName); if (queue == null) { - _logger.info("Queue '" + binding.getQueueName() + "' does not exists. Creating."); + _logger.info("Creating queue '" + queueName + "' [on virtual host ]" ); + + boolean durable = queueConfiguration.getBoolean("durable" ,false); + boolean autodelete = queueConfiguration.getBoolean("autodelete", false); + String owner = queueConfiguration.getString("owner", null); queue = new AMQQueue(queueName, - Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)), - null /* These queues will have no owner */, - false /* Therefore autodelete makes no sence */, queueRegistry); + durable, + owner == null ? null : owner /* These queues will have no owner */, + autodelete /* Therefore autodelete makes no sence */, queueRegistry); if (queue.isDurable()) { @@ -194,27 +117,67 @@ public class VirtualHostConfiguration } else { - _logger.info("Queue '" + binding.getQueueName() + "' already exists not creating."); + _logger.info("Queue '" + queueName + "' already exists [on virtual host ], not creating."); } - Exchange defaultExchange = exchangeRegistry.getExchange(binding.getExchangeName()); - synchronized (defaultExchange) + String exchangeName = queueConfiguration.getString("exchange", null); + + Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : exchangeName); + + if(exchange == null) { - if (defaultExchange == null) - { - throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + binding); - } + exchange = exchangeRegistry.getDefaultExchange(); + } - defaultExchange.registerQueue(queue.getName(), queue, null); + if (exchange == null) + { + throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); + } - if (binding.getRoutingKey() == null || binding.getRoutingKey().equals("")) + synchronized (exchange) + { + List routingKeys = queueConfiguration.getList("routingKey"); + if(routingKeys == null || routingKeys.isEmpty()) { - throw new ConfigurationException("Unknown binding not specified on url:" + binding); + routingKeys = Collections.singletonList(queue.getName()); } - queue.bind(binding.getRoutingKey(), defaultExchange); + for(Object routingKey : routingKeys) + { + exchange.registerQueue((String)routingKey, queue, null); + + queue.bind((String)routingKey, exchange); + + _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'"); + } } - _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + binding.getExchangeName() + " RK:'" + binding.getRoutingKey() + "'"); + } + + + Configurator.configure(queue);//, queueConfiguration); } + + + public void performBindings() throws AMQException, ConfigurationException + { + List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name"); + + _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames); + + for(Object nameObject : virtualHostNames) + { + String name = String.valueOf(nameObject); + configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name)); + } + + if (virtualHostNames == null || virtualHostNames.isEmpty()) + { + throw new ConfigurationException( + "Virtualhost Configuration document does not contain a valid virtualhost."); + } + } + + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index eb9d1acb59..92dc0fa43e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,10 +20,10 @@ */ package org.apache.qpid.server.exchange; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.protocol.ExchangeInitialiser; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.log4j.Logger; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -37,6 +37,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry */ private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>(); + private Exchange _defaultExchange; + public DefaultExchangeRegistry(ExchangeFactory exchangeFactory) { //create 'standard' exchanges: @@ -52,9 +54,23 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void registerExchange(Exchange exchange) { + if(_defaultExchange == null) + { + setDefaultExchange(exchange); + } _exchangeMap.put(exchange.getName(), exchange); } + public void setDefaultExchange(Exchange exchange) + { + _defaultExchange = exchange; + } + + public Exchange getDefaultExchange() + { + return _defaultExchange; + } + public void unregisterExchange(String name, boolean inUse) throws AMQException { // TODO: check inUse argument @@ -71,7 +87,16 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public Exchange getExchange(String name) { - return _exchangeMap.get(name); + + if(name == null || name.length() == 0) + { + return _defaultExchange; + } + else + { + return _exchangeMap.get(name); + } + } /** @@ -82,14 +107,15 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void routeContent(AMQMessage payload) throws AMQException { final String exchange = payload.getPublishBody().exchange; - final Exchange exch = _exchangeMap.get(exchange); + final Exchange exch = getExchange(exchange); // there is a small window of opportunity for the exchange to be deleted in between - // the JmsPublish being received (where the exchange is validated) and the final + // the BasicPublish being received (where the exchange is validated) and the final // content body being received (which triggers this method) + // TODO: check where the exchange is validated if (exch == null) { throw new AMQException("Exchange '" + exchange + "' does not exist"); } exch.route(payload); } -} +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index 4a0a6a0ee1..1d32ee17a9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -37,4 +37,6 @@ public interface ExchangeRegistry extends MessageRouter void unregisterExchange(String name, boolean inUse) throws ExchangeInUseException, AMQException; Exchange getExchange(String name); + + Exchange getDefaultExchange(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index afe4ea95b9..8603113c11 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.message.MessageDecorator; import org.apache.qpid.server.message.jms.JMSMessage; import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap; */ public class AMQMessage { + private static final Logger _log = Logger.getLogger(AMQMessage.class); + public static final String JMS_MESSAGE = "jms.message"; private final Set<Object> _tokens = new HashSet<Object>(); @@ -61,6 +64,8 @@ public class AMQMessage private final AtomicInteger _referenceCount = new AtomicInteger(1); + private long _arrivalTime; + /** * Keeps a track of how many bytes we have received in body frames */ @@ -157,20 +162,20 @@ public class AMQMessage public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag) { - + AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()]; // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. allFrames[0] = BasicDeliverBody.createAMQFrame(channel, - (byte)8, (byte)0, // AMQP version (major, minor) - consumerTag, // consumerTag - deliveryTag, // deliveryTag - getExchangeName(), // exchange - _redelivered, // redelivered - getRoutingKey() // routingKey - ); + (byte) 8, (byte) 0, // AMQP version (major, minor) + consumerTag, // consumerTag + deliveryTag, // deliveryTag + getExchangeName(), // exchange + _redelivered, // redelivered + getRoutingKey() // routingKey + ); allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); for (int i = 2; i < allFrames.length; i++) { @@ -201,6 +206,8 @@ public class AMQMessage public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException { _contentHeaderBody = contentHeaderBody; + _arrivalTime = System.currentTimeMillis(); + if (_storeWhenComplete && isAllContentReceived()) { storeMessage(); @@ -223,6 +230,7 @@ public class AMQMessage _bodyLengthReceived += contentBody.getSize(); if (_storeWhenComplete && isAllContentReceived()) { + _arrivalTime = System.currentTimeMillis(); storeMessage(); } } @@ -263,6 +271,12 @@ public class AMQMessage _redelivered = redelivered; } + + public long getArrivalTime() + { + return _arrivalTime; + } + public long getMessageId() { return _messageId; @@ -299,6 +313,7 @@ public class AMQMessage throw new MessageCleanupException(_messageId, e); } } + } public void setPublisher(AMQProtocolSession publisher) @@ -367,11 +382,17 @@ public class AMQMessage return _txnBuffer; } + public long getSize() + { + return getContentHeaderBody().bodySize; + } + /** * Called to enforce the 'immediate' flag. + * * @throws NoConsumersException if the message is marked for - * immediate delivery but has not been marked as delivered to a - * consumer + * immediate delivery but has not been marked as delivered to a + * consumer */ public void checkDeliveredToConsumer() throws NoConsumersException { @@ -393,7 +414,8 @@ public class AMQMessage /** * Called selectors to determin if the message has already been sent - * @return _deliveredToConsumer + * + * @return _deliveredToConsumer */ public boolean getDeliveredToConsumer() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 101a2833a0..d8bacc8c7d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; @@ -90,22 +91,36 @@ public class AMQQueue implements Managable, Comparable /** * max allowed size(KB) of a single message */ - private long _maxMessageSize = 10000; + private long _maximumMessageSize = 10000; /** * max allowed number of messages on a queue. */ - private Integer _maxMessageCount = 10000; + @Configured(path = "maximumMessageCount", defaultValue = "0") + public int _maximumMessageCount; /** - * max queue depth(KB) for the queue + * max queue depth for the queue */ - private long _maxQueueDepth = 10000000; + @Configured(path = "maximumQueueDepth", defaultValue = "0") + public long _maximumQueueDepth = 10000000; + + /* + * maximum message age before alerts occur + */ + @Configured(path = "maximumMessageAge", defaultValue = "0") + public long _maximumMessageAge = 30000; //0 + + /* + * the minimum interval between sending out consequetive alerts of the same type + */ + @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") + public long _minimumAlertRepeatGap = 30000; /** * total messages received by the queue since startup. */ - private long _totalMessagesReceived = 0; + public long _totalMessagesReceived = 0; public int compareTo(Object o) { @@ -183,35 +198,13 @@ public class AMQQueue implements Managable, Comparable _autoDelete = autoDelete; _queueRegistry = queueRegistry; _asyncDelivery = asyncDelivery; + _managedObject = createMBean(); _managedObject.register(); + _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; - - //fixme - Make this configurable via the broker config.xml - if (System.getProperties().getProperty("deliverymanager") != null) - { - if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager")) - { - _logger.info("Using ConcurrentSelectorDeliveryManager"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); - } - else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager")) - { - _logger.info("Using ConcurrentDeliveryManager"); - _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this); - } - else - { - _logger.info("Using SynchronizedDeliveryManager"); - _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); - } - } - else - { - _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); - } + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); } private AMQQueueMBean createMBean() throws AMQException @@ -267,6 +260,11 @@ public class AMQQueue implements Managable, Comparable return _deliveryMgr.getMessages(); } + public long getQueueDepth() + { + return _deliveryMgr.getTotalMessageSize(); + } + /** * @param messageId * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. @@ -295,50 +293,55 @@ public class AMQQueue implements Managable, Comparable return _managedObject; } - public Long getMaximumMessageSize() + public long getMaximumMessageSize() { - return _maxMessageSize; + return _maximumMessageSize; } - public void setMaximumMessageSize(Long value) + public void setMaximumMessageSize(long value) { - _maxMessageSize = value; + _maximumMessageSize = value; } - public Integer getConsumerCount() + public int getConsumerCount() { return _subscribers.size(); } - public Integer getActiveConsumerCount() + public int getActiveConsumerCount() { return _subscribers.getWeight(); } - public Long getReceivedMessageCount() + public long getReceivedMessageCount() { return _totalMessagesReceived; } - public Integer getMaximumMessageCount() + public int getMaximumMessageCount() { - return _maxMessageCount; + return _maximumMessageCount; } - public void setMaximumMessageCount(Integer value) + public void setMaximumMessageCount(int value) { - _maxMessageCount = value; + _maximumMessageCount = value; } - public Long getMaximumQueueDepth() + public long getMaximumQueueDepth() { - return _maxQueueDepth; + return _maximumQueueDepth; } // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(Long value) + public void setMaximumQueueDepth(long value) { - _maxQueueDepth = value; + _maximumQueueDepth = value; + } + + public long getOldestMessageArrivalTime() + { + return _deliveryMgr.getOldestMessageArrival(); } /** @@ -374,11 +377,11 @@ public class AMQQueue implements Managable, Comparable Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); - if(subscription.hasFilters()) + if (subscription.hasFilters()) { if (_deliveryMgr.hasQueuedMessages()) { - _deliveryMgr.populatePreDeliveryQueue(subscription); + _deliveryMgr.populatePreDeliveryQueue(subscription); } } @@ -551,6 +554,27 @@ public class AMQQueue implements Managable, Comparable } } + public long getMinimumAlertRepeatGap() + { + return _minimumAlertRepeatGap; + } + + public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap) + { + _minimumAlertRepeatGap = minimumAlertRepeatGap; + } + + public long getMaximumMessageAge() + { + return _maximumMessageAge; + } + + public void setMaximumMessageAge(long maximumMessageAge) + { + _maximumMessageAge = maximumMessageAge; + } + + private class Deliver implements TxnOp { private final AMQMessage _msg; @@ -591,4 +615,5 @@ public class AMQQueue implements Managable, Comparable } } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index a914975e00..f5ecf6ba55 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.mina.common.ByteBuffer; +import org.apache.log4j.Logger; import javax.management.openmbean.*; import javax.management.JMException; @@ -41,8 +42,11 @@ import java.util.ArrayList; * for an AMQQueue. */ @MBeanDescription("Management Interface for AMQQueue") -public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue +public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener { + + private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class); + private AMQQueue _queue = null; private String _queueName = null; // OpenMBean data types for viewMessages method @@ -51,12 +55,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. - + // OpenMBean data types for viewMessageContent method private static CompositeType _msgContentType = null; private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"}; private static OpenType[] _msgContentAttributeTypes = new OpenType[4]; - + + private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; + @MBeanConstructor("Creates an MBean exposing an AMQQueue") public AMQQueueMBean(AMQQueue queue) throws JMException { @@ -71,7 +77,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue { init(); } - catch(JMException ex) + catch (JMException ex) { // It should never occur System.out.println(ex.getMessage()); @@ -88,7 +94,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, - _msgContentAttributes, _msgContentAttributeTypes); + _msgContentAttributes, _msgContentAttributeTypes); _msgAttributeTypes[0] = SimpleType.LONG; // For message id _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes @@ -215,35 +221,31 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue */ public void checkForNotification(AMQMessage msg) { - // Check for threshold message count - Integer msgCount = getMessageCount(); - if (msgCount >= getMaximumMessageCount()) - { - notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value"); - } + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); - // Check for threshold message size - long messageSize = getMessageSize(msg); - if (messageSize >= _queue.getMaximumMessageSize()) + for (NotificationCheck check : NotificationCheck.values()) { - notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value"); - } - - // Check for threshold queue depth in bytes - long queueDepth = getQueueDepth(); - if (queueDepth >= _queue.getMaximumQueueDepth()) - { - notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value"); + if (check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()] < thresholdTime) + { + if (check.notifyIfNecessary(msg, _queue, this)) + { + _lastNotificationTimes[check.ordinal()] = currentTime; + } + } } } /** * Sends the notification to the listeners */ - private void notifyClients(String notificationMsg) + public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg) { + // important : add log to the log file - monitoring tools may be looking for this + _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg); + Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, - ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); + ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); _broadcaster.sendNotification(n); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java deleted file mode 100644 index 022d3b9635..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ /dev/null @@ -1,357 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; -import org.apache.qpid.configuration.Configured; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.server.configuration.Configurator; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.atomic.AtomicBoolean; - - -/** - * Manages delivery of messages on behalf of a queue - */ -public class ConcurrentDeliveryManager implements DeliveryManager -{ - private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class); - - @Configured(path = "advanced.compressBufferOnQueue", - defaultValue = "false") - public boolean compressBufferOnQueue; - /** - * Holds any queued messages - */ - private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); - //private int _messageCount; - /** - * Ensures that only one asynchronous task is running for this manager at - * any time. - */ - private final AtomicBoolean _processing = new AtomicBoolean(); - /** - * The subscriptions on the queue to whom messages are delivered - */ - private final SubscriptionManager _subscriptions; - - /** - * A reference to the queue we are delivering messages for. We need this to be able - * to pass the code that handles acknowledgements a handle on the queue. - */ - private final AMQQueue _queue; - - - /** - * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced - * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered - * via the async thread. - * <p/> - * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. - */ - private ReentrantLock _lock = new ReentrantLock(); - - - ConcurrentDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) - { - - //Set values from configuration - Configurator.configure(this); - - if (compressBufferOnQueue) - { - _log.info("Compressing Buffers on queue."); - } - - _subscriptions = subscriptions; - _queue = queue; - } - - /** - * @return boolean if we are queueing - */ - private boolean queueing() - { - return hasQueuedMessages(); - } - - - /** - * @param msg to enqueue - * @return true if we are queue this message - */ - private boolean enqueue(AMQMessage msg) - { - if (msg.isImmediate()) - { - return false; - } - else - { - _lock.lock(); - try - { - if (queueing()) - { - return addMessageToQueue(msg); - } - else - { - return false; - } - } - finally - { - _lock.unlock(); - } - } - } - - private void startQueueing(AMQMessage msg) - { - if (!msg.isImmediate()) - { - addMessageToQueue(msg); - } - } - - private boolean addMessageToQueue(AMQMessage msg) - { - // Shrink the ContentBodies to their actual size to save memory. - if (compressBufferOnQueue) - { - Iterator it = msg.getContentBodies().iterator(); - while (it.hasNext()) - { - ContentBody cb = (ContentBody) it.next(); - cb.reduceBufferToFit(); - } - } - - _messages.offer(msg); - - return true; - } - - - public boolean hasQueuedMessages() - { - - _lock.lock(); - try - { - return !_messages.isEmpty(); - } - finally - { - _lock.unlock(); - } - - - } - - public int getQueueMessageCount() - { - return getMessageCount(); - } - - /** - * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size. - * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. - * - * @return int the number of messages in the delivery queue. - */ - private int getMessageCount() - { - return _messages.size(); - } - - - public synchronized List<AMQMessage> getMessages() - { - return new ArrayList<AMQMessage>(_messages); - } - - public void populatePreDeliveryQueue(Subscription subscription) - { - //no-op . This DM has no PreDeliveryQueues - } - - public synchronized void removeAMessageFromTop() throws AMQException - { - AMQMessage msg = poll(); - if (msg != null) - { - msg.dequeue(_queue); - } - } - - public synchronized void clearAllMessages() throws AMQException - { - AMQMessage msg = poll(); - while (msg != null) - { - msg.dequeue(_queue); - msg = poll(); - } - } - - /** - * Only one thread should ever execute this method concurrently, but - * it can do so while other threads invoke deliver(). - */ - private void processQueue() - { - try - { - boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); - AMQMessage message = peek(); - - //While we have messages to send and subscribers to send them to. - while (message != null && hasSubscribers) - { - // _log.debug("Have messages(" + _messages.size() + ") and subscribers"); - Subscription next = _subscriptions.nextSubscriber(message); - //FIXME Is there still not the chance that this subscribe could be suspended between here and the send? - - //We don't synchronize access to subscribers so need to re-check - if (next != null) - { - next.send(message, _queue); - poll(); - message = peek(); - } - else - { - hasSubscribers = false; - } - } - } - catch (FailedDequeueException e) - { - _log.error("Unable to deliver message as dequeue failed: " + e, e); - } - finally - { - _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers()); - } - } - - private AMQMessage peek() - { - return _messages.peek(); - } - - private AMQMessage poll() - { - return _messages.poll(); - } - - Runner asyncDelivery = new Runner(); - - public void processAsync(Executor executor) - { - _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + - " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get()); - - if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) - { - //are we already running? if so, don't re-run - if (_processing.compareAndSet(false, true)) - { - // Do we need this? - // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok. - //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown()) - { - executor.execute(asyncDelivery); - } - } - } - } - - public void deliver(String name, AMQMessage msg) throws FailedDequeueException - { - // first check whether we are queueing, and enqueue if we are - if (!enqueue(msg)) - { - // not queueing so deliver message to 'next' subscriber - _lock.lock(); - try - { - Subscription s = _subscriptions.nextSubscriber(msg); - if (s == null) - { - if (!msg.isImmediate()) - { - // no subscribers yet so enter 'queueing' mode and queue this message - startQueueing(msg); - } - } - else - { - s.send(msg, _queue); - } - } - finally - { - _lock.unlock(); - } - } - } - - private class Runner implements Runnable - { - public void run() - { - boolean running = true; - while (running) - { - processQueue(); - - //Check that messages have not been added since we did our last peek(); - // Synchronize with the thread that adds to the queue. - // If the queue is still empty then we can exit - _lock.lock(); - try - { - if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) - { - running = false; - _processing.set(false); - } - } - finally - { - _lock.unlock(); - } - } - } - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index f09e8213b1..9efeb8351c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -34,6 +34,7 @@ import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** @@ -76,7 +77,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. */ private ReentrantLock _lock = new ReentrantLock(); - + private AtomicLong _totalMessageSize = new AtomicLong(); ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -109,6 +110,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _messages.offer(msg); + _totalMessageSize.addAndGet(msg.getSize()); + return true; } @@ -142,6 +145,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return _messages.size(); } + public long getTotalMessageSize() + { + return _totalMessageSize.get(); + } + + public long getOldestMessageArrival() + { + AMQMessage msg = _messages.peek(); + return msg == null ? Long.MAX_VALUE : msg.getArrivalTime(); + } + public synchronized List<AMQMessage> getMessages() { @@ -173,6 +187,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (msg != null) { msg.dequeue(_queue); + _totalMessageSize.addAndGet(-msg.getSize()); } } @@ -182,6 +197,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager while (msg != null) { msg.dequeue(_queue); + _totalMessageSize.set(0L); msg = poll(); } } @@ -222,6 +238,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //remove sent message from our queue. messageQueue.poll(); + _totalMessageSize.addAndGet(-message.getSize()); } catch (FailedDequeueException e) { @@ -308,7 +325,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //Pre Deliver to all subscriptions if (_log.isDebugEnabled()) { - _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + + _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to."); } for (Subscription sub : _subscriptions.getSubscriptions()) @@ -330,7 +347,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + + _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } sub.enqueueForPreDelivery(msg); @@ -345,7 +362,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s); } //Deliver the message diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index cac499587f..28386dfa45 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -64,7 +64,8 @@ interface DeliveryManager * * @param name the name of the entity on whose behalf we are delivering the message * @param msg the message to deliver - * @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued + * @throws org.apache.qpid.server.queue.FailedDequeueException + * if the message could not be dequeued */ void deliver(String name, AMQMessage msg) throws FailedDequeueException; @@ -75,4 +76,8 @@ interface DeliveryManager List<AMQMessage> getMessages(); void populatePreDeliveryQueue(Subscription subscription); + + long getTotalMessageSize(); + + long getOldestMessageArrival(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java new file mode 100644 index 0000000000..8e9b3804f2 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -0,0 +1,135 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; + +public enum NotificationCheck +{ + + MESSAGE_COUNT_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + int msgCount = queue.getMessageCount(); + final int maximumMessageCount = queue.getMaximumMessageCount(); + if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount) + { + listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached."); + return true; + } + return false; + } + }, + MESSAGE_SIZE_ALERT(true) + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + final long maximumMessageSize = queue.getMaximumMessageSize(); + if(maximumMessageSize != 0) + { + // Check for threshold message size + long messageSize; +// try +// { + messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize; +// } +// catch (AMQException e) +// { +// messageSize = 0; +// } + + + if (messageSize >= maximumMessageSize) + { + listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]"); + return true; + } + } + return false; + } + + }, + QUEUE_DEPTH_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + // Check for threshold queue depth in bytes + final long maximumQueueDepth = queue.getMaximumQueueDepth(); + + if(maximumQueueDepth != 0) + { + final long queueDepth = queue.getQueueDepth(); + + if (queueDepth >= maximumQueueDepth) + { + listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached."); + return true; + } + } + return false; + } + + }, + MESSAGE_AGE_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + + final long maxMessageAge = queue.getMaximumMessageAge(); + if(maxMessageAge != 0) + { + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - maxMessageAge; + final long firstArrivalTime = queue.getOldestMessageArrivalTime(); + + if(firstArrivalTime < thresholdTime) + { + long oldestAge = currentTime - firstArrivalTime; + listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached."); + + return true; + } + } + return false; + + } + + } + ; + + private final boolean _messageSpecific; + + NotificationCheck() + { + this(false); + } + + NotificationCheck(boolean messageSpecific) + { + _messageSpecific = messageSpecific; + } + + public boolean isMessageSpecific() + { + return _messageSpecific; + } + + abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java new file mode 100644 index 0000000000..bd9d7f6b11 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java @@ -0,0 +1,23 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +public interface QueueNotificationListener +{ + void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java deleted file mode 100644 index c967ea2cde..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; - -import java.util.LinkedList; -import java.util.Queue; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Manages delivery of messages on behalf of a queue - */ -class SynchronizedDeliveryManager implements DeliveryManager -{ - private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class); - - /** - * Holds any queued messages - */ - private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>(); - /** - * Ensures that only one asynchronous task is running for this manager at - * any time. - */ - private final AtomicBoolean _processing = new AtomicBoolean(); - /** - * The subscriptions on the queue to whom messages are delivered - */ - private final SubscriptionManager _subscriptions; - - /** - * An indication of the mode we are in. If this is true then messages are - * being queued up in _messages for asynchronous delivery. If it is false - * then messages can be delivered directly as they come in. - */ - private volatile boolean _queueing; - - /** - * A reference to the queue we are delivering messages for. We need this to be able - * to pass the code that handles acknowledgements a handle on the queue. - */ - private final AMQQueue _queue; - - SynchronizedDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) - { - _subscriptions = subscriptions; - _queue = queue; - } - - private synchronized boolean enqueue(AMQMessage msg) - { - if (msg.isImmediate()) - { - return false; - } - else - { - if (_queueing) - { - _messages.offer(msg); - return true; - } - else - { - return false; - } - } - } - - private synchronized void startQueueing(AMQMessage msg) - { - _queueing = true; - enqueue(msg); - } - - /** - * Determines whether there are queued messages. Sets _queueing to false if - * there are no queued messages. This needs to be atomic. - * - * @return true if there are queued messages - */ - public synchronized boolean hasQueuedMessages() - { - boolean empty = _messages.isEmpty(); - if (empty) - { - _queueing = false; - } - return !empty; - } - - public synchronized int getQueueMessageCount() - { - return _messages.size(); - } - - public synchronized List<AMQMessage> getMessages() - { - return new ArrayList<AMQMessage>(_messages); - } - - public void populatePreDeliveryQueue(Subscription subscription) - { - //no-op . This DM has no PreDeliveryQueues - } - - public synchronized void removeAMessageFromTop() throws AMQException - { - AMQMessage msg = poll(); - if (msg != null) - { - msg.dequeue(_queue); - } - } - - public synchronized void clearAllMessages() throws AMQException - { - AMQMessage msg = poll(); - while (msg != null) - { - msg.dequeue(_queue); - msg = poll(); - } - } - - /** - * Only one thread should ever execute this method concurrently, but - * it can do so while other threads invoke deliver(). - */ - private void processQueue() - { - try - { - boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); - while (hasQueuedMessages() && hasSubscribers) - { - Subscription next = _subscriptions.nextSubscriber(peek()); - //We don't synchronize access to subscribers so need to re-check - if (next != null) - { - try - { - next.send(poll(), _queue); - } - catch (AMQException e) - { - _log.error("Unable to deliver message: " + e, e); - } - } - else - { - hasSubscribers = false; - } - } - } - finally - { - _processing.set(false); - } - } - - private synchronized AMQMessage peek() - { - return _messages.peek(); - } - - private synchronized AMQMessage poll() - { - return _messages.poll(); - } - - /** - * Requests that the delivery manager start processing the queue asynchronously - * if there is work that can be done (i.e. there are messages queued up and - * subscribers that can receive them. - * <p/> - * This should be called when subscribers are added, but only after the consume-ok - * message has been returned as message delivery may start immediately. It should also - * be called after unsuspending a client. - * <p/> - * - * @param executor the executor on which the delivery should take place - */ - public void processAsync(Executor executor) - { - if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) - { - //are we already running? if so, don't re-run - if (_processing.compareAndSet(false, true)) - { - // Do we need this? - // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok. - //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown()) - { - executor.execute(new Runner()); - } - } - } - } - - /** - * Handles message delivery. The delivery manager is always in one of two modes; - * it is either queueing messages for asynchronous delivery or delivering - * directly. - * - * @param name the name of the entity on whose behalf we are delivering the message - * @param msg the message to deliver - * @throws NoConsumersException if there are no active subscribers to deliver - * the message to - */ - public void deliver(String name, AMQMessage msg) throws FailedDequeueException - { - // first check whether we are queueing, and enqueue if we are - if (!enqueue(msg)) - { - synchronized(this) - { - // not queueing so deliver message to 'next' subscriber - Subscription s = _subscriptions.nextSubscriber(msg); - if (s == null) - { - // no subscribers yet so enter 'queueing' mode and queue this message - startQueueing(msg); - } - else - { - s.send(msg, _queue); - } - } - } - - } - - private class Runner implements Runnable - { - public void run() - { - processQueue(); - } - } -} diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java deleted file mode 100644 index db871281bf..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameAlreadyBoundException; -import javax.naming.NamingException; -import javax.naming.NoInitialContextException; -import java.io.File; -import java.util.Hashtable; - -import junit.framework.TestCase; - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -class Bind extends TestCase -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest" + System.currentTimeMillis(); - public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; - public String PROVIDER_URL = DEFAULT_PROVIDER_URL; - - String _connectionFactoryString = ""; - - String _connectionString = "amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'"; - Topic _topic = null; - - boolean _bound = false; - - public Bind() throws NameAlreadyBoundException, NoInitialContextException - { - this(false, DEFAULT_PROVIDER_URL); - } - - public Bind(boolean output) throws NameAlreadyBoundException, NoInitialContextException - { - this(output, DEFAULT_PROVIDER_URL); - } - - public Bind(boolean output, String providerURL) throws NameAlreadyBoundException, NoInitialContextException - { - PROVIDER_URL = providerURL; - - // Set up the environment for creating the initial context - Hashtable env = new Hashtable(11); - env.put(Context.INITIAL_CONTEXT_FACTORY, - "com.sun.jndi.fscontext.RefFSContextFactory"); - - - env.put(Context.PROVIDER_URL, PROVIDER_URL); - - - File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3)); - - if (file.exists() && !file.isDirectory()) - { - System.out.println("Couldn't make directory file already exists"); - return; - } - else - { - if (!file.exists()) - { - if (!file.mkdirs()) - { - System.out.println("Couldn't make directory"); - return; - } - } - } - - Connection connection = null; - try - { - // Create the initial context - Context ctx = new InitialContext(env); - - // Create the connection factory to be bound - ConnectionFactory connectionFactory = null; - // Create the Connection to be bound - - - try - { - connectionFactory = new AMQConnectionFactory(_connectionString); - connection = connectionFactory.createConnection(); - - _connectionFactoryString = ((AMQConnectionFactory) connectionFactory).getConnectionURL().getURL(); - } - catch (JMSException jmsqe) - { - fail("Unable to create Connection:" + jmsqe); - } - catch (URLSyntaxException urlse) - { - fail("Unable to create Connection:" + urlse); - } - - try - { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _topic = session.createTopic("Fruity"); - } - catch (JMSException jmse) - { - - } - // Perform the binds - ctx.bind("ConnectionFactory", connectionFactory); - if (output) - { - System.out.println("Bound factory\n" + ((AMQConnectionFactory) connectionFactory).getConnectionURL()); - } - ctx.bind("Connection", connection); - if (output) - { - System.out.println("Bound Connection\n" + ((AMQConnection) connection).toURL()); - } - ctx.bind("Topic", _topic); - if (output) - { - System.out.println("Bound Topic:\n" + ((AMQTopic) _topic).toURL()); - } - _bound = true; - - // Check that it is bound - //Object obj = ctx.lookup("Connection"); - //System.out.println(((AMQConnection)obj).toURL()); - - // Close the context when we're done - ctx.close(); - } - catch (NamingException e) - { - System.out.println("Operation failed: " + e); - if (e instanceof NameAlreadyBoundException) - { - throw(NameAlreadyBoundException) e; - } - - if (e instanceof NoInitialContextException) - { - throw(NoInitialContextException) e; - } - } - finally - { - try - { - if (connection != null) - { - connection.close(); - } - } - catch (JMSException e) - { - //ignore just want it closed - } - } - } - - public String connectionFactoryValue() - { - if (_connectionFactoryString != null) - { - return _connectionFactoryString; - } - else - { - return ""; - } - } - - public String connectionValue() - { - if (_connectionString != null) - { - return _connectionString; - } - else - { - return ""; - } - } - - public String topicValue() - { - if (_topic != null) - { - return ((AMQTopic) _topic).toURL(); - } - else - { - return ""; - } - - } - - public boolean bound() - { - return _bound; - } - - public String getProviderURL() - { - return PROVIDER_URL; - } - - public static void main(String[] args) throws NameAlreadyBoundException, NoInitialContextException - { - new Bind(true); - } -} diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java deleted file mode 100644 index 4731caca98..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.test.VMBrokerSetup; - -import javax.naming.NameAlreadyBoundException; -import javax.naming.NoInitialContextException; - -import junit.framework.TestCase; - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -public class JNDIReferenceableTest extends TestCase -{ - // FIXME FSContext has been removed from repository. This needs redone with the PropertiesFileInitialContextFactory. QPID-84 - public void testReferenceable() - { - Bind b = null; - try - { - try - { - b = new Bind(); - } - catch (NameAlreadyBoundException e) - { - if (new Unbind().unbound()) - { - try - { - b = new Bind(); - } - catch (NameAlreadyBoundException ee) - { - fail("Unable to clear bound objects for test."); - } - } - else - { - fail("Unable to clear bound objects for test."); - } - } - } - catch (NoInitialContextException e) - { - fail("You don't have the File System SPI on you class path.\n" + - "This can be downloaded from sun here:\n" + - "http://java.sun.com/products/jndi/downloads/index.html\n" + - "Click : Download JNDI 1.2.1 & More button\n" + - "Download: File System Service Provider, 1.2 Beta 3\n" + - "and add the two jars in the lib dir to your class path."); - } - - assertTrue(b.bound()); - - Lookup l = new Lookup(b.getProviderURL()); - - assertTrue(l.connectionFactoryValue().equals(b.connectionFactoryValue())); - - assertTrue(l.connectionValue().equals(b.connectionValue())); - - assertTrue(l.topicValue().equals(b.topicValue())); - - - Unbind u = new Unbind(); - - assertTrue(u.unbound()); - - } - - public static junit.framework.Test suite() - { - return new VMBrokerSetup(new junit.framework.TestSuite(JNDIReferenceableTest.class)); - } -} diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java deleted file mode 100644 index b804ccb30c..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQTopic; - -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import javax.jms.JMSException; -import java.io.File; -import java.util.Hashtable; - - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -class Lookup -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest"; - public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; - public String PROVIDER_URL = DEFAULT_PROVIDER_URL; - - AMQTopic _topic = null; - AMQConnection _connection = null; - AMQConnectionFactory _connectionFactory = null; - private String _connectionURL; - - - public Lookup() - { - this(DEFAULT_PROVIDER_URL); - } - - public Lookup(String providerURL) - { - - PROVIDER_URL = providerURL; - - // Set up the environment for creating the initial context - Hashtable env = new Hashtable(11); - env.put(Context.INITIAL_CONTEXT_FACTORY, - "com.sun.jndi.fscontext.RefFSContextFactory"); - - env.put(Context.PROVIDER_URL, PROVIDER_URL); - - File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3)); - - if (file.exists() && !file.isDirectory()) - { - System.out.println("Couldn't make directory file already exists"); - return; - } - else - { - if (!file.exists()) - { - if (!file.mkdirs()) - { - System.out.println("Couldn't make directory"); - return; - } - } - } - - try - { - // Create the initial context - Context ctx = new InitialContext(env); - - _topic = (AMQTopic) ctx.lookup("Topic"); - - _connection = (AMQConnection) ctx.lookup("Connection"); - - _connectionURL = _connection.toURL(); - - _connectionFactory = (AMQConnectionFactory) ctx.lookup("ConnectionFactory"); - //System.out.println(topic); - - // Close the context when we're done - ctx.close(); - } - catch (NamingException e) - { - System.out.println("Operation failed: " + e); - } - finally - { - try - { - if (_connection != null) - { - _connection.close(); - } - } - catch (JMSException e) - { - //ignore just need to close - } - } - } - - public String connectionFactoryValue() - { - if (_connectionFactory != null) - { - return _connectionFactory.getConnectionURL().toString(); - } - return ""; - } - - public String connectionValue() - { - if (_connectionURL != null) - { - return _connectionURL; - } - return ""; - } - - public String topicValue() - { - if (_topic != null) - { - return _topic.toURL(); - } - return ""; - } - - public String getProviderURL() - { - return PROVIDER_URL; - } - - public static void main(String[] args) - { - new Lookup(); - } -} - diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java deleted file mode 100644 index 869bc55d8f..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameNotFoundException; -import javax.naming.NamingException; -import javax.jms.Connection; -import javax.jms.JMSException; -import java.io.File; -import java.util.Hashtable; - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -class Unbind -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest" + System.currentTimeMillis(); - public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; - public String PROVIDER_URL = DEFAULT_PROVIDER_URL; - - boolean _unbound = false; - - public Unbind() - { - this(false, DEFAULT_PROVIDER_URL); - } - - public Unbind(Boolean output) - { - this(output, DEFAULT_PROVIDER_URL); - } - - public Unbind(String provider) - { - this(false, provider); - } - - public Unbind(boolean output, String providerURL) - { - PROVIDER_URL = providerURL; - // Set up the environment for creating the initial context - Hashtable env = new Hashtable(11); - env.put(Context.INITIAL_CONTEXT_FACTORY, - "com.sun.jndi.fscontext.RefFSContextFactory"); - env.put(Context.PROVIDER_URL, PROVIDER_URL); - - File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3)); - - if (file.exists() && !file.isDirectory()) - { - System.out.println("Couldn't make directory file already exists"); - return; - } - else - { - if (!file.exists()) - { - if (!file.mkdirs()) - { - System.out.println("Couldn't make directory"); - return; - } - } - } - - try - { - // Create the initial context - Context ctx = new InitialContext(env); - - // Remove the binding - ctx.unbind("ConnectionFactory"); - ctx.unbind("Connection"); - ctx.unbind("Topic"); - - // Check that it is gone - Object obj = null; - try - { - obj = ctx.lookup("ConnectionFactory"); - } - catch (NameNotFoundException ne) - { - if (output) - { - System.out.println("unbind ConnectionFactory successful"); - } - try - { - obj = ctx.lookup("Connection"); - try - { - ((Connection) obj).close(); - } - catch (JMSException e) - { - //ignore just need to close - } - } - catch (NameNotFoundException ne2) - { - if (output) - { - System.out.println("unbind Connection successful"); - } - - try - { - obj = ctx.lookup("Topic"); - } - catch (NameNotFoundException ne3) - { - if (output) - { - System.out.println("unbind Topic successful"); - } - _unbound = true; - } - } - } - - //System.out.println("unbind failed; object still there: " + obj); - - // Close the context when we're done - - ctx.close(); - - } - catch (NamingException e) - { - System.out.println("Operation failed: " + e); - } - } - - public boolean unbound() - { - return _unbound; - } - - public static void main(String[] args) - { - - new Unbind(true); - } -} - diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java index ecc8f1d1e9..8c90bd772c 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java @@ -197,7 +197,7 @@ public class DispatcherTest extends TestCase try { - _logger.error("Send additional messages"); + _logger.info("Send additional messages"); for (int msg = 0; msg < MSG_COUNT; msg++) { diff --git a/qpid/java/mvn-repo/README.txt b/qpid/java/mvn-repo/README.txt deleted file mode 100644 index c78cda6938..0000000000 --- a/qpid/java/mvn-repo/README.txt +++ /dev/null @@ -1,5 +0,0 @@ -Temporary local repository for jars that are not available in the central repository yet. -This was created because junit-toolkit is undergoing some development to use it with perf tests. Its an Apache licensed performance testing utility. -It takes 2-4 days to get something added to the central repo but want to keep up with changed to junit-toolkit. This repository holds a snapshot -of it, so that the build can get it from somewhere without breaking. It is anticipated that it will reach a stable 0.5 version within two weeks from -30/01/2007, at which time it will be placed in the central repo and this local repository will be deleted. diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar Binary files differdeleted file mode 100644 index 43c678f547..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar +++ /dev/null diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md5 deleted file mode 100644 index 87820b3b0a..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -8aff63861edb0a6bb47b5fad955a6ba5
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha1 deleted file mode 100644 index 5a72a41b79..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -600209771b236268f1b939e4a924899875ee8562
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom deleted file mode 100644 index 65587eb683..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom +++ /dev/null @@ -1,91 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?><project> - <modelVersion>4.0.0</modelVersion> - <groupId>uk.co.thebadgerset</groupId> - <artifactId>junit-toolkit-maven-plugin</artifactId> - <packaging>maven-plugin</packaging> - <name>junit-toolkit-maven-plugin</name> - <version>0.5-20070130.111904-1</version> - <description>Maven plugin for the JUnit Toolkit to run performance tests with TKTestRunner.</description> - <url>http://www.thebadgerset.co.uk/projects/junit-toolkit-maven-plugin</url> - <licenses> - <license> - <name>The Apache Software License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - <distribution>repo</distribution> - </license> - </licenses> - <scm> - <connection>scm:${scm.setup}/junit-toolkit-maven-plugin</connection> - </scm> - <organization> - <name>The Badger Set trading as Liberty Bishop (1151) ltd.</name> - <url>http://www.thebadgerset.co.uk/</url> - </organization> - <build> - <sourceDirectory>src/main</sourceDirectory> - <pluginManagement> - <plugins> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>2.0.1</version> - <configuration> - <source>1.5</source> - <target>1.5</target> - <fork>false</fork> - </configuration> - </plugin> - </plugins> - </pluginManagement> - </build> - <dependencies> - <dependency> - <groupId>uk.co.thebadgerset</groupId> - <artifactId>junit-toolkit</artifactId> - <version>0.5-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.maven</groupId> - <artifactId>maven-plugin-api</artifactId> - <version>2.0.4</version> - </dependency> - </dependencies> - <reporting> - <plugins> - <plugin> - <artifactId>maven-pmd-plugin</artifactId> - <configuration> - <linkXref>true</linkXref> - <sourceEncoding>utf-8</sourceEncoding> - <minimumTokens>20</minimumTokens> - <targetJdk>1.5</targetJdk> - </configuration> - </plugin> - <plugin> - <artifactId>maven-jxr-plugin</artifactId> - </plugin> - <plugin> - <artifactId>maven-javadoc-plugin</artifactId> - </plugin> - <plugin> - <artifactId>maven-checkstyle-plugin</artifactId> - <configuration> - <configLocation>../mavenbuild/coding_standards.xml</configLocation> - </configuration> - </plugin> - </plugins> - </reporting> - <distributionManagement> - <repository> - <uniqueVersion>false</uniqueVersion> - <id>release-repo</id> - <name>The Badger Set Maven2 Repository</name> - <url>file://c:/temp</url> - </repository> - <snapshotRepository> - <id>snapshot-repo</id> - <name>The Badger Set Maven2 Snapshot Repository</name> - <url>file://c:/temp</url> - </snapshotRepository> - <status>deployed</status> - </distributionManagement> -</project>
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md5 deleted file mode 100644 index adf20d93ad..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -4ab65f208ffa4400551233321b90933a
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha1 deleted file mode 100644 index aeb3966048..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -84f491024bd60142781ef9035f4394cb1379902d
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml deleted file mode 100644 index 0a46c1d79a..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml +++ /dev/null @@ -1,12 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?><metadata> - <groupId>uk.co.thebadgerset</groupId> - <artifactId>junit-toolkit-maven-plugin</artifactId> - <version>0.5-SNAPSHOT</version> - <versioning> - <snapshot> - <timestamp>20070130.111904</timestamp> - <buildNumber>1</buildNumber> - </snapshot> - <lastUpdated>20070130111904</lastUpdated> - </versioning> -</metadata>
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md5 deleted file mode 100644 index 4e7eab390b..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md5 +++ /dev/null @@ -1 +0,0 @@ -c619b7ac915b2eba622d556b2d2e0c25
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha1 deleted file mode 100644 index 83a5267307..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha1 +++ /dev/null @@ -1 +0,0 @@ -db7b5d51a53a5018611391ecc3346032a6c20dda
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml deleted file mode 100644 index a3bff0dde2..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml +++ /dev/null @@ -1,12 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?><metadata> - <groupId>uk.co.thebadgerset</groupId> - <artifactId>junit-toolkit-maven-plugin</artifactId> - <version>0.5-SNAPSHOT</version> - <versioning> - <latest>0.5-SNAPSHOT</latest> - <versions> - <version>0.5-SNAPSHOT</version> - </versions> - <lastUpdated>20070130111904</lastUpdated> - </versioning> -</metadata>
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md5 deleted file mode 100644 index 395a968533..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md5 +++ /dev/null @@ -1 +0,0 @@ -da47ce66de64d4ba056d0a9c901c5676
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha1 deleted file mode 100644 index b396785e6c..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha1 +++ /dev/null @@ -1 +0,0 @@ -70adf93da1c1757152e954750ceb2477a8659a99
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar Binary files differdeleted file mode 100644 index b5c0129ea9..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar +++ /dev/null diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5 deleted file mode 100644 index 2f9b7922bd..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -ce27581a94a89b664830f3c355dd7bf5
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1 deleted file mode 100644 index 3c17b5a8e4..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -ffecddfd23345c7fb4177ab1d89cf73a4fe7adc9
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom deleted file mode 100644 index a2f72deff6..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom +++ /dev/null @@ -1,111 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?><project> - <modelVersion>4.0.0</modelVersion> - <groupId>uk.co.thebadgerset</groupId> - <artifactId>junit-toolkit</artifactId> - <name>junit-toolkit</name> - <version>0.5-20070202.132554-1</version> - <description>JUnit Toolkit enhances JUnit with performance testing, asymptotic behaviour analysis, and concurrency testing.</description> - <url>http://www.thebadgerset.co.uk/junit-toolkit</url> - <developers> - <developer> - <id>rupert</id> - <name>Rupert Smith</name> - <email>rupertlssmith (contactable on g-m-a-i-l)</email> - <organization></organization> - </developer> - </developers> - <licenses> - <license> - <name>The Apache Software License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - <distribution>repo</distribution> - </license> - </licenses> - <scm> - <connection>scm:svn:http://www.thebadgerset.co.uk/svn/junit-toolkit</connection> - </scm> - <organization> - <name>The Badger Set trading as Liberty Bishop (1151) ltd.</name> - <url>http://www.thebadgerset.co.uk/</url> - </organization> - <build> - <sourceDirectory>src/main</sourceDirectory> - <pluginManagement> - <plugins> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>2.0.1</version> - <configuration> - <source>1.5</source> - <target>1.5</target> - <fork>false</fork> - </configuration> - </plugin> - </plugins> - </pluginManagement> - </build> - <pluginRepositories> - <pluginRepository> - <snapshots /> - <id>apache.snapshots</id> - <name>Apache SNAPSHOT Repository</name> - <url>http://people.apache.org/repo/m2-snapshot-repository</url> - </pluginRepository> - </pluginRepositories> - <dependencies> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>3.8.1</version> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>1.2.8</version> - </dependency> - <dependency> - <groupId>regexp</groupId> - <artifactId>regexp</artifactId> - <version>1.3</version> - </dependency> - </dependencies> - <reporting> - <plugins> - <plugin> - <artifactId>maven-pmd-plugin</artifactId> - <configuration> - <linkXref>true</linkXref> - <sourceEncoding>utf-8</sourceEncoding> - <minimumTokens>20</minimumTokens> - <targetJdk>1.5</targetJdk> - </configuration> - </plugin> - <plugin> - <artifactId>maven-jxr-plugin</artifactId> - </plugin> - <plugin> - <artifactId>maven-javadoc-plugin</artifactId> - </plugin> - <plugin> - <artifactId>maven-checkstyle-plugin</artifactId> - <configuration> - <configLocation>../mavenbuild/coding_standards.xml</configLocation> - </configuration> - </plugin> - </plugins> - </reporting> - <distributionManagement> - <repository> - <uniqueVersion>false</uniqueVersion> - <id>release-repo</id> - <name>The Badger Set Maven2 Repository</name> - <url>file://c:/temp</url> - </repository> - <snapshotRepository> - <id>snapshot-repo</id> - <name>The Badger Set Maven2 Snapshot Repository</name> - <url>file://c:/temp</url> - </snapshotRepository> - <status>deployed</status> - </distributionManagement> -</project>
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5 deleted file mode 100644 index ec5938d266..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -36d35e778356cef8a984a021d9bc0fe4
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1 deleted file mode 100644 index f889d5fda6..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -9383e1d89168d83973f47595063a35b733a3854d
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml deleted file mode 100644 index 42bb1afc36..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml +++ /dev/null @@ -1,12 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?><metadata> - <groupId>uk.co.thebadgerset</groupId> - <artifactId>junit-toolkit</artifactId> - <version>0.5-SNAPSHOT</version> - <versioning> - <snapshot> - <timestamp>20070202.132554</timestamp> - <buildNumber>1</buildNumber> - </snapshot> - <lastUpdated>20070202132554</lastUpdated> - </versioning> -</metadata>
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 deleted file mode 100644 index 2d29a790e5..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 +++ /dev/null @@ -1 +0,0 @@ -277e07c561ec6eebda9d1a470a2ce6a4
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 deleted file mode 100644 index 0b047b645a..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 +++ /dev/null @@ -1 +0,0 @@ -155e5e47c236cefb5668414e81cc485867bdb93e
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml deleted file mode 100644 index 76b0c44645..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml +++ /dev/null @@ -1,11 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?><metadata> - <groupId>uk.co.thebadgerset</groupId> - <artifactId>junit-toolkit</artifactId> - <version>0.5-SNAPSHOT</version> - <versioning> - <versions> - <version>0.5-SNAPSHOT</version> - </versions> - <lastUpdated>20070202132554</lastUpdated> - </versioning> -</metadata>
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 deleted file mode 100644 index a57795d6c0..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 +++ /dev/null @@ -1 +0,0 @@ -77e2a30606515c4f52bcb466c3966f63
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 deleted file mode 100644 index 6b612f5d27..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 +++ /dev/null @@ -1 +0,0 @@ -111f4320859e398337dbf4e396ec2fad1e5aa8dd
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml b/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml deleted file mode 100644 index 4c367f55d3..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml +++ /dev/null @@ -1,9 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?><metadata> - <plugins> - <plugin> - <name>junit-toolkit-maven-plugin</name> - <prefix>junit-toolkit</prefix> - <artifactId>junit-toolkit-maven-plugin</artifactId> - </plugin> - </plugins> -</metadata>
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md5 deleted file mode 100644 index da122e37da..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md5 +++ /dev/null @@ -1 +0,0 @@ -8eee1c76f27e4d20ffcd48d87897b923
\ No newline at end of file diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha1 deleted file mode 100644 index fd57ffc943..0000000000 --- a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha1 +++ /dev/null @@ -1 +0,0 @@ -300ebe0bfa8ae2b56d26a9fae9073ef9f902b0e7
\ No newline at end of file diff --git a/qpid/java/perftests/pom.xml b/qpid/java/perftests/pom.xml index 02f55b43ee..3bcda0f359 100644 --- a/qpid/java/perftests/pom.xml +++ b/qpid/java/perftests/pom.xml @@ -44,7 +44,7 @@ <repository> <id>junit-toolkit.snapshots</id> <name>JUnit Toolkit SNAPSHOT Repository</name> - <url>file://${basedir}/../mvn-repo</url> + <url>http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/snapshots/</url> <snapshots> <enabled>true</enabled> </snapshots> @@ -56,7 +56,7 @@ <pluginRepository> <id>junit-toolkit-plugin.snapshots</id> <name>JUnit Toolkit SNAPSHOT Repository</name> - <url>file://${basedir}/../mvn-repo</url> + <url>http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/snapshots/</url> <snapshots> <enabled>true</enabled> </snapshots> @@ -148,19 +148,7 @@ <name>amqj.test.logging.level</name> <value>info</value> </property> - <property> - <name>logdir</name> - <value>$QPID_WORK/results</value> - </property> - <property> - <name>-Xms</name> - <value>256m</value> - </property> - <property> - <name>-Xmx</name> - <value>512m</value> - </property> - </systemproperties> + </systemproperties> <commands> <!-- Single pings. These can be scaled up by overriding the parameters when calling the test script. --> @@ -200,147 +188,19 @@ -n Ping-Failover-After-Commit -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf CommitBatchSize=10 FailAfterCommit=true </Ping-Failover-After-Commit> - - <!-- P2P Volume Tests. --> - <VT-Qpid-1>-n VT-Qpid-1 -s [15000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 transacted=true</VT-Qpid-1> - <VT-Qpid-2>-n VT-Qpid-2 -s [15000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000</VT-Qpid-2> - <!-- Setting sample to 3,000,000 will result in a log entry every 10 minutes so should have 144 data points for the run. --> - <VT-Qpid-3>-n VT-Qpid-3 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true rate=10000 BatchSize=3000000 transacted=true</VT-Qpid-3> - <VT-Qpid-4>-n VT-Qpid-4 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true rate=10000 BatchSize=3000000</VT-Qpid-4> - - <!-- P2P Scalability Tests. --> - <!-- 250,000 Total, 1P-1T-1C --> - <PT-Qpid-1>-n PT-Qpid-1 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true transacted=true</PT-Qpid-1> - <PT-Qpid-2>-n PT-Qpid-2 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true </PT-Qpid-2> - - <!-- 25000 Msgs * 10 Brokers = 250,000 Total, 10P-1Q-10C --> - <PT-Qpid-3>-n PT-Qpid-3 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true transacted=true</PT-Qpid-3> - <PT-Qpid-4>-n PT-Qpid-4 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true </PT-Qpid-4> - - <!-- 25000 Msgs * 10 Brokers = 250,000 Tota,l 10P-10T-10C 10*(1P-1Q-1C) --> - <PT-Qpid-5>-n PT-Qpid-5 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=1 transacted=true</PT-Qpid-5> - <PT-Qpid-6>-n PT-Qpid-6 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=1</PT-Qpid-6> - - <!-- 2500 Msgs * 10 Brokers * 10 Topics/Clients = 250,000 Total, 10P-100T-10C 10*(1P-10T-1C) --> - <PT-Qpid-7>-n PT-Qpid-7 -s [2500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=10 transacted=true</PT-Qpid-7> - <PT-Qpid-8>-n PT-Qpid-8 -s [2500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=10</PT-Qpid-8> - - <!-- 2500 Msgs * 100 Brokers = 250,000 Total, 100P-100T-100C 100*(1P-1T-1C) --> - <PT-Qpid-9>-n PT-Qpid-9 -s [2500] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1 transacted=true CommitBatchSize=500</PT-Qpid-9> - <PT-Qpid-10>-n PT-Qpid-10 -s [2500] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1</PT-Qpid-10> + </commands> + </configuration> - <!-- 250 Msgs * 100 Brokers * 10 Clients = 250,000 Total, 100P-1000T-100C 100*(1P-10T-1C) --> - <PT-Qpid-11>-n PT-Qpid-11 -s [250] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=10 transacted=true CommitBatchSize=50</PT-Qpid-11> - <PT-Qpid-12>-n PT-Qpid-12 -s [250] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=10</PT-Qpid-12> + <executions> + <execution> + <phase>test</phase> + <!--<goals> + <goal>tktest</goal> + </goals>--> + </execution> + </executions> + </plugin> - <!-- 250 Msgs * 1000 Brokers = 250,000 Total, 1000P-1000T-1000C 1000*(1P-1T-1C) --> - <PT-Qpid-13>-n PT-Qpid-13 -s [250] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1 transacted=true CommitBatchSize=50</PT-Qpid-13> - <PT-Qpid-14>-n PT-Qpid-14 -s [250] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1</PT-Qpid-14> - - <!-- P2P Volume Tests. --> - <VQ-Qpid-1>-n VQ-Qpid-1 -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true</VQ-Qpid-1> - <VQ-Qpid-2>-n VQ-Qpid-2 -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000</VQ-Qpid-2> - <!-- Setting sample to 3,000,000 will result in a log entry every 10 minutes so should have 144 data points for the run. --> - <VQ-Qpid-3>-n VQ-Qpid-3 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf rate=10000 BatchSize=3000000 transacted=true</VQ-Qpid-3> - <VQ-Qpid-4>-n VQ-Qpid-4 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf rate=10000 BatchSize=3000000 </VQ-Qpid-4> - - <!-- P2P Scalability Tests. --> - <!-- 15,000 Total, 1P-1Q-1C --> - <PQ-Qpid-1>-n PQ-Qpid-1 -s [15000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf transacted=true</PQ-Qpid-1> - <PQ-Qpid-2>-n PQ-Qpid-2 -s [15000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf</PQ-Qpid-2> - - <!-- 1500 Messages * 10 Brokers = 15,000 Total, 10P-1Q-10C --> - <PQ-Qpid-3>-n PQ-Qpid-3 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationname=ping transacted=true CommitBatchSize=500</PQ-Qpid-3> - <PQ-Qpid-4>-n PQ-Qpid-4 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationname=ping</PQ-Qpid-4> - - <!-- 1500 Messages * 10 Brokers = 15,000 Total, 10P-10Q-10C 10*(1P-1Q-1C) --> - <PQ-Qpid-5>-n PQ-Qpid-5 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=1 transacted=true CommitBatchSize=500</PQ-Qpid-5> - <PQ-Qpid-6>-n PQ-Qpid-6 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=1</PQ-Qpid-6> - - <!-- 1500 Messages * 10 Brokers = 15,000 Total, 10P-100Q-10C 10*(1P-10Q-1C) --> - <PQ-Qpid-7>-n PQ-Qpid-7 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10 transacted=true CommitBatchSize=500</PQ-Qpid-7> - <PQ-Qpid-8>-n PQ-Qpid-8 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10</PQ-Qpid-8> - - <!-- 150 Messages * 100 Brokers = 15,000 Total, 100P-100Q-100C 100*(1P-1Q-1C) --> - <PQ-Qpid-9>-n PQ-Qpid-9 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=50 destinationcount=1 transacted=true CommitBatchSize=50</PQ-Qpid-9> - <PQ-Qpid-10>-n PQ-Qpid-10 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=50 destinationcount=1 </PQ-Qpid-10> - - <!-- 150 Messages * 100 Brokers = 15,000 Total, 100P-1000Q-100C 100*(1P-10Q-1C) --> - <PQ-Qpid-11>-n PQ-Qpid-11 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10 transacted=true CommitBatchSize=50</PQ-Qpid-11> - <PQ-Qpid-12>-n PQ-Qpid-12 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10</PQ-Qpid-12> - - <!-- 15 Messages * 1000 Brokers = 15,000 Total, 1000P-1000Q-1000C 1000*(1P-1Q-1C) --> - <PQ-Qpid-13>-n PQ-Qpid-13 -s [15] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=15 transacted=true CommitBatchSize=15</PQ-Qpid-13> - <PQ-Qpid-14>-n PQ-Qpid-14 -s [15] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=15 </PQ-Qpid-14> - - <!-- Increasing Message Payload Tests. --> - <!-- Topic Testing --> - <LT-Qpid-1-512b>-n LT-Qpid-1-512b -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=512 transacted=true</LT-Qpid-1-512b> - <LT-Qpid-2-512b>-n LT-Qpid-2-512b -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=512</LT-Qpid-2-512b> - - <LT-Qpid-1-1K>-n LT-Qpid-1-1K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 transacted=true</LT-Qpid-1-1K> - <LT-Qpid-2-1K>-n LT-Qpid-2-1K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000</LT-Qpid-2-1K> - - <LT-Qpid-1-5K>-n LT-Qpid-1-5K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=5120 transacted=true</LT-Qpid-1-5K> - <LT-Qpid-2-5K>-n LT-Qpid-2-5K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=5120</LT-Qpid-2-5K> - - <LT-Qpid-1-10K>-n LT-Qpid-1-10K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=10240 transacted=true</LT-Qpid-1-10K> - <LT-Qpid-2-10K>-n LT-Qpid-2-10K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=10240 </LT-Qpid-2-10K> - - <LT-Qpid-1-50K>-n LT-Qpid-1-50K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=51200 transacted=true</LT-Qpid-1-50K> - <LT-Qpid-2-50K>-n LT-Qpid-2-50K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=51200</LT-Qpid-2-50K> - - <LT-Qpid-1-100K>-n LT-Qpid-1-100K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=102400 transacted=true</LT-Qpid-1-100K> - <LT-Qpid-2-100K>-n LT-Qpid-2-100K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=102400</LT-Qpid-2-100K> - - <LT-Qpid-1-1M>-n LT-Qpid-1-1M -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=1048576 transacted=true</LT-Qpid-1-1M> - <LT-Qpid-2-1M>-n LT-Qpid-2-1M -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=1048476</LT-Qpid-2-1M> - - <!-- Queue Testing --> - <LT-Qpid-3-512b>-n LT-Qpid-3-512b -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=512 transacted=true</LT-Qpid-3-512b> - <LT-Qpid-4-512b>-n LT-Qpid-4-512b -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=512</LT-Qpid-4-512b> - - <LT-Qpid-3-1K>-n LT-Qpid-3-1K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true</LT-Qpid-3-1K> - <LT-Qpid-4-1K>-n LT-Qpid-4-1K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000</LT-Qpid-4-1K> - - <LT-Qpid-3-5K>-n LT-Qpid-3-5K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=5120 transacted=true</LT-Qpid-3-5K> - <LT-Qpid-4-5K>-n LT-Qpid-4-5K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=5120</LT-Qpid-4-5K> - - <LT-Qpid-3-10K>-n LT-Qpid-3-10K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=10240 transacted=true</LT-Qpid-3-10K> - <LT-Qpid-4-10K>-n LT-Qpid-4-10K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=10240</LT-Qpid-4-10K> - - <LT-Qpid-3-50K>-n LT-Qpid-3-50K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=51200 transacted=true</LT-Qpid-3-50K> - <LT-Qpid-4-50K>-n LT-Qpid-4-50K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=51200</LT-Qpid-4-50K> - - <LT-Qpid-3-100K>-n LT-Qpid-3-100K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=102400 transacted=true</LT-Qpid-3-100K> - <LT-Qpid-4-100K>-n LT-Qpid-4-100K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=102400</LT-Qpid-4-100K> - - <LT-Qpid-3-1M>-n LT-Qpid-3-1M -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=1048576 transacted=true</LT-Qpid-3-1M> - <LT-Qpid-4-1M>-n LT-Qpid-4-1M -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=1048576 </LT-Qpid-4-1M> - - <!-- Failover Tests. --> - <!-- Transactional --> - <FT-Qpid-1>-n FT-Qpid-1 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" FailBeforeSend=true</FT-Qpid-1> - <FT-Qpid-2>-n FT-Qpid-2 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" FailAfterSend=true</FT-Qpid-2> - <FT-Qpid-3>-n FT-Qpid-3 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" FailAfterCommit=true</FT-Qpid-3> - - <!-- Non Transactional --> - <FT-Qpid-4>-n FT-Qpid-4 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false FailBeforeSend=true</FT-Qpid-4> - <FT-Qpid-5>-n FT-Qpid-5 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false FailAfterSend=true</FT-Qpid-5> - - - </commands> - </configuration> - - <executions> - <execution> - <phase>test</phase> - <!--<goals> - <goal>tktest</goal> - </goals>--> - </execution> - </executions> - </plugin> <!-- Bundles all the dependencies, fully expanded into a single jar, required to run the tests. Usefull when bundling system, integration or performance tests into a convenient diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java index a295919565..a52b5a0c49 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -38,6 +38,8 @@ import org.apache.qpid.requestreply.PingPongProducer; */ public class PingClient extends PingPongProducer { + private static int _pingClientCount; + /** * Creates a ping producer with the specified parameters, of which there are many. See their individual comments * for details. This constructor creates ping pong producer but de-registers its reply-to destination message @@ -76,6 +78,8 @@ public class PingClient extends PingPongProducer super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate, pubsub, unique); + + _pingClientCount++; } /** @@ -88,4 +92,17 @@ public class PingClient extends PingPongProducer { return _pingDestinations; } + + public int getConsumersPerTopic() + { + if (_isUnique) + { + return 1; + } + else + { + return _pingClientCount; + } + } + } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 027bb04a12..6f444bd290 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -50,21 +50,21 @@ import uk.co.thebadgerset.junit.extensions.Throttle; /** * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back * client (see {@link PingPongBouncer} for the bounce back client). - * + * <p/> * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. * This means that this class has to do some work to correlate pings with pongs; it expectes the original message * correlation id in the ping to be bounced back in the reply correlation id. - * + * <p/> * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. * It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings * within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform * failover testing. - * + * <p/> * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is * also registered to terminate the ping-pong loop cleanly. - * + * <p/> * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations * <tr><td> Provide a ping and wait for all responses cycle. @@ -72,25 +72,21 @@ import uk.co.thebadgerset.junit.extensions.Throttle; * </table> * * @todo The use of a ping rate {@link #DEFAULT_RATE} and waits between pings {@link #DEFAULT_SLEEP_TIME} are overlapping. - * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to - * be created and configured by the test runner from the -f command line option and made available through - * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware - * tests. - * + * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to + * be created and configured by the test runner from the -f command line option and made available through + * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware + * tests. * @todo Make acknowledege mode a test option. - * * @todo Make the message listener a static for all replies to be sent to? It won't be any more of a bottle neck than - * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process - * messages concurrently for different ids. Needs to be static so that when using a chained message listener and - * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to - * be picked up by the PPP that it is atteched to. - * + * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process + * messages concurrently for different ids. Needs to be static so that when using a chained message listener and + * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to + * be picked up by the PPP that it is atteched to. * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock - * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message - * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means - * that the last message waits until all other messages have been handled before releasing producers but allows - * messages to be processed concurrently, unlike the current synchronized block. - * + * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message + * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means + * that the last message waits until all other messages have been handled before releasing producers but allows + * messages to be processed concurrently, unlike the current synchronized block. * @todo Need to multiply up the number of expected messages for pubsub tests as each can be received by many consumers? */ public class PingPongProducer implements Runnable, MessageListener, ExceptionListener @@ -228,9 +224,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); - /** A source for providing unique ids to PingPongProducer. */ - private static AtomicInteger _pingProducerIdGenerator; - /** * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross * multiple ping producers on the same JVM. @@ -238,7 +231,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /*private static Map<String, CountDownLatch> trafficLights = Collections.synchronizedMap(new HashMap<String, CountDownLatch>());*/ private static Map<String, PerCorrelationId> perCorrelationIds = - Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); /** A convenient formatter to use when time stamping output. */ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); @@ -273,6 +266,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Flag used to indicate if this is a point to point or pub/sub ping client. */ protected boolean _isPubSub = false; + /** Flag used to indicate if the destinations should be unique client. */ + protected static boolean _isUnique = false; + /** * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers * on the same JVM using this id generator will allow them to ping on the same queues. @@ -313,6 +309,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis protected int _txBatchSize = 1; /** + * Holds the number of consumers that will be attached to each topic. + * Each pings will result in a reply from each of the attached clients + */ + static int _consumersPerTopic = 1; + + /** * Creates a ping producer with the specified parameters, of which there are many. See their individual comments * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it, * to send and recieve its pings and replies on. The other options are kept, and control how this pinger behaves. @@ -339,7 +341,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * possible, with no rate restriction. * @param pubsub True to ping topics, false to ping queues. * @param unique True to use unique destinations for each ping pong producer, false to share. - * * @throws Exception Any exceptions are allowed to fall through. */ public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, @@ -358,6 +359,19 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis + txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + "): called"); + // Keep all the relevant options. + _persistent = persistent; + _messageSize = messageSize; + _verbose = verbose; + _failAfterCommit = afterCommit; + _failBeforeCommit = beforeCommit; + _failAfterSend = afterSend; + _failBeforeSend = beforeSend; + _failOnce = failOnce; + _txBatchSize = txBatchSize; + _isPubSub = pubsub; + _isUnique = unique; + // Check that one or more destinations were specified. if (noOfDestinations < 1) { @@ -388,18 +402,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis createProducer(); createPingDestinations(noOfDestinations, selector, destinationName, unique); createReplyConsumers(getReplyDestinations(), selector); - - // Keep all the remaining options. - _persistent = persistent; - _messageSize = messageSize; - _verbose = verbose; - _failAfterCommit = afterCommit; - _failBeforeCommit = beforeCommit; - _failAfterSend = afterSend; - _failBeforeSend = beforeSend; - _failOnce = failOnce; - _txBatchSize = txBatchSize; - _isPubSub = pubsub; } /** @@ -407,6 +409,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * to be started to bounce the pings back again. * * @param args The command line arguments. + * @throws Exception When something went wrong with the test */ public static void main(String[] args) throws Exception { @@ -421,7 +424,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } String brokerDetails = config.getHost() + ":" + config.getPort(); - String virtualpath = "test"; + String virtualpath = "/test"; String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector(); boolean verbose = true; boolean transacted = config.isTransacted(); @@ -479,9 +482,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Create a ping producer to handle the request/wait/reply cycle. PingPongProducer pingProducer = - new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector, - transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend, - beforeSend, failOnce, batchSize, destCount, rate, pubsub, false); + new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector, + transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend, + beforeSend, failOnce, batchSize, destCount, rate, pubsub, false); pingProducer.getConnection().start(); @@ -511,7 +514,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis Thread.sleep(sleepTime); } catch (InterruptedException ie) - { } + { + //ignore + } } } @@ -555,11 +560,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param rootName The root of the name, or actual name if only one is being created. * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share * the numbering with all pingers on the same JVM. - * * @throws JMSException Any JMSExceptions are allowed to fall through. */ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique) - throws JMSException + throws JMSException { _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = " @@ -568,28 +572,32 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Create the desired number of ping destinations and consumers for them. for (int i = 0; i < noOfDestinations; i++) { - AMQDestination destination = null; + AMQDestination destination; int id; // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. if (unique) { + _logger.debug("Creating unique destinations."); id = _queueJVMSequenceID.incrementAndGet(); } else { + _logger.debug("Creating shared destinations."); id = _queueSharedId.incrementAndGet(); } // Check if this is a pub/sub pinger, in which case create topics. if (_isPubSub) { + _logger.debug("Creating topics."); destination = new AMQTopic(rootName + id); } // Otherwise this is a p2p pinger, in which case create queues. else { + _logger.debug("Creating queues."); destination = new AMQQueue(rootName + id); } @@ -697,11 +705,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param message The message to send. * @param numPings The number of ping messages to send. * @param timeout The timeout in milliseconds. - * * @return The number of replies received. This may be less than the number sent if the timeout terminated the * wait for all prematurely. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * @throws InterruptedException When interrupted by a timeout. */ public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException { @@ -723,14 +730,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param numPings The number of ping messages to send. * @param timeout The timeout in milliseconds. * @param messageCorrelationId The message correlation id. - * * @return The number of replies received. This may be less than the number sent if the timeout terminated the * wait for all prematurely. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * @throws InterruptedException When interrupted by a timeout */ public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) - throws JMSException, InterruptedException + throws JMSException, InterruptedException { _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called"); @@ -743,7 +749,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // chained message listener must be called before this sender can be unblocked, but that decrementing the // countdown needs to be done before the chained listener can be called. PerCorrelationId perCorrelationId = new PerCorrelationId(); - perCorrelationId.trafficLight = new CountDownLatch(numPings + 1); + + perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1); perCorrelationIds.put(messageCorrelationId, perCorrelationId); // Set up the current time as the start time for pinging on the correlation id. This is used to determine @@ -763,11 +770,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); // Work out how many replies were receieved. - numReplies = numPings - (int) perCorrelationId.trafficLight.getCount(); - allMessagesReceived = numReplies >= numPings; + numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount(); + + allMessagesReceived = numReplies == getExpectedNumPings(numPings); - _logger.debug("numReplies = "+ numReplies); - _logger.debug("allMessagesReceived = "+ allMessagesReceived); + _logger.debug("numReplies = " + numReplies); + _logger.debug("allMessagesReceived = " + allMessagesReceived); // Recheck the timeout condition. long now = System.nanoTime(); @@ -779,7 +787,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } while (!timedOut && !allMessagesReceived); - if ((numReplies < numPings) && _verbose) + if ((numReplies < getExpectedNumPings(numPings)) && _verbose) { _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); } @@ -808,7 +816,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param message The message to send. * @param numPings The number of pings to send. * @param messageCorrelationId A correlation id to place on all messages sent. - * * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException @@ -864,9 +871,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } } - /** - * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each. - */ + /** The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each. */ public void pingLoop() { try @@ -909,9 +914,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _chainedMessageListener = messageListener; } - /** - * Removes any chained message listeners from this pinger. - */ + /** Removes any chained message listeners from this pinger. */ public void removeChainedMessageListener() { _chainedMessageListener = null; @@ -923,9 +926,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param replyQueue The reply-to destination for the message. * @param messageSize The desired size of the message in bytes. * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise. - * * @return A freshly generated test message. - * * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. */ public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException @@ -947,9 +948,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _publish = false; } - /** - * Implements a ping loop that repeatedly pings until the publish flag becomes false. - */ + /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ public void run() { // Keep running until the publish flag is cleared. @@ -980,12 +979,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public Thread getShutdownHook() { return new Thread(new Runnable() + { + public void run() { - public void run() - { - stop(); - } - }); + stop(); + } + }); } /** @@ -1003,7 +1002,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * * @param destinations The destinations to listen to. * @param selector A selector to filter the messages with. - * * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. */ public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException @@ -1015,8 +1013,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { // Create a consumer for the destination and set this pinger to listen to its messages. MessageConsumer consumer = - _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE, - selector); + _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE, + selector); consumer.setMessageListener(this); } } @@ -1039,19 +1037,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** * Convenience method to commit the transaction on the specified session. If the session to commit on is not * a transactional session, this method does nothing (unless the failover after send flag is set). - * + * <p/> * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit * is applied. This flag applies whether the pinger is transactional or not. - * + * <p/> * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker * after the commit is applied. These flags will only apply if using a transactional pinger. * + * @param session The session to commit * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - * - * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit - * method, because commits only apply to transactional pingers, but fail after send applied to transactional - * and non-transactional alike. + * <p/> + * //todo @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit + * method, because commits only apply to transactional pingers, but fail after send applied to transactional + * and non-transactional alike. */ protected void commitTx(Session session) throws JMSException { @@ -1132,7 +1131,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * * @param destination The destination to send to. * @param message The message to send. - * * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through. */ protected void sendMessage(Destination destination, Message message) throws JMSException @@ -1170,17 +1168,35 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis System.in.read(); } catch (IOException e) - { } + { + //ignore + } System.out.println("Continuing."); } /** + * This value will be changed by PingClient to represent the number of clients connected to each topic. + * + * @return int The number of consumers subscribing to each topic. + */ + public int getConsumersPerTopic() + { + return _consumersPerTopic; + } + + public int getExpectedNumPings(int numpings) + { + return numpings * getConsumersPerTopic(); + } + + + /** * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's * {@link PingPongProducer#onMessage} method is called, the chained listener set through the * {@link PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected * count of messages with that correlation id. - * + * <p/> * Provided only one pinger is producing messages with that correlation id, the chained listener will always be * given unique message counts. It will always be called while the producer waiting for all messages to arrive is * still blocked. diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index c01987cfc0..347031ff51 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -70,7 +70,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ private Map<String, PerCorrelationId> perCorrelationIds = - Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); /** Holds the batched results listener, that does logging on batch boundaries. */ private BatchedResultsListener batchedResultsListener = null; @@ -91,6 +91,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA /** * Compile all the tests into a test suite. + * @return The test suite to run. Should only contain testAsyncPingOk method. */ public static Test suite() { @@ -128,6 +129,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA * all replies have been received or a time out occurs before exiting this method. * * @param numPings The number of pings to send. + * @throws Exception pass all errors out to the test harness */ public void testAsyncPingOk(int numPings) throws Exception { @@ -151,7 +153,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA PerCorrelationId perCorrelationId = new PerCorrelationId(); TimingController tc = getTimingController().getControllerForCurrentThread(); perCorrelationId._tc = tc; - perCorrelationId._expectedCount = numPings; + perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); perCorrelationIds.put(messageCorrelationId, perCorrelationId); // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these @@ -160,18 +162,18 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Generate a sample message of the specified size. ObjectMessage msg = - pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, messageCorrelationId); // Check that all the replies were received and log a fail if they were not. - if (numReplies < numPings) + if (numReplies < perCorrelationId._expectedCount) { - tc.completeTest(false, numPings - numReplies); + tc.completeTest(false, numPings - perCorrelationId._expectedCount); } // Remove the chained message listener from the ping producer. diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index f81ff531da..6d024a189d 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -110,6 +110,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware /** * Compile all the tests into a test suite. + * @return The test method testPingOk. */ public static Test suite() { @@ -139,18 +140,18 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware // Generate a sample message. This message is already time stamped and has its reply-to destination set. ObjectMessage msg = - perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger( - PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean( - PingPongProducer.PERSISTENT_MODE_PROPNAME)); + perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger( + PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean( + PingPongProducer.PERSISTENT_MODE_PROPNAME)); // start the test long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout); // Fail the test if the timeout was exceeded. - if (numReplies != numPings) + if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) { Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); @@ -191,7 +192,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware // Extract the test set up paramaeters. int destinationscount = - Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME)); + Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME)); // This is synchronized because there is a race condition, which causes one connection to sleep if // all threads try to create connection concurrently. diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java index fe8960c872..1a2817a7b2 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -52,7 +52,7 @@ public class ConcurrencyTest extends MessageTestHelper public ConcurrencyTest() throws Exception { - _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false, + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false, new DefaultQueueRegistry())); } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java index 3072d44f48..b3d1eb1325 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java @@ -21,10 +21,6 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.ConcurrentDeliveryManager; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.DeliveryManagerTest; public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest { @@ -33,7 +29,7 @@ public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest try { System.setProperty("concurrentdeliverymanager","true"); - _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, + _mgr = new ConcurrentSelectorDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, new DefaultQueueRegistry())); } catch (Throwable t) diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java index 3631264e5a..858547a05d 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -169,7 +169,6 @@ abstract public class DeliveryManagerTest extends MessageTestHelper { TestSuite suite = new TestSuite(); suite.addTestSuite(ConcurrentDeliveryManagerTest.class); - suite.addTestSuite(SynchronizedDeliveryManagerTest.class); return suite; } } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java deleted file mode 100644 index ebe8e192a0..0000000000 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.queue.SynchronizedDeliveryManager; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.DeliveryManagerTest; -import org.apache.qpid.AMQException; - -import junit.framework.TestSuite; - -public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest -{ - public SynchronizedDeliveryManagerTest() throws Exception - { - try - { - System.setProperty("concurrentdeliverymanager","false"); - _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, - new DefaultQueueRegistry())); - } - catch (Throwable t) - { - t.printStackTrace(); - throw new AMQException("Could not initialise delivery manager", t); - } - } - - public static junit.framework.Test suite() - { - TestSuite suite = new TestSuite(); - suite.addTestSuite(SynchronizedDeliveryManagerTest.class); - return suite; - } -} |