diff options
Diffstat (limited to 'qpid/java')
93 files changed, 4358 insertions, 405 deletions
diff --git a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd index 6e005f5bdb..f49578ba8c 100755 --- a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd +++ b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.9.0 +ver: 0.11.0 Bundle-SymbolicName: qpid-shutdown-plugin Bundle-Version: ${ver} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index a6f319cb1f..5192d5be6f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -1096,6 +1096,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return Boolean.FALSE; } + public Long getFlowStoppedCount() + { + return 0L; + } + public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory factory, final Long request) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index a612f280d6..d1ea5dba69 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -327,4 +327,74 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr { return getObjectNameForSingleInstanceMBean(); } + + public void resetStatistics() throws Exception + { + getVirtualHost().resetStatistics(); + } + + public double getPeakMessageDeliveryRate() + { + return getVirtualHost().getMessageDeliveryStatistics().getPeak(); + } + + public double getPeakDataDeliveryRate() + { + return getVirtualHost().getDataDeliveryStatistics().getPeak(); + } + + public double getMessageDeliveryRate() + { + return getVirtualHost().getMessageDeliveryStatistics().getRate(); + } + + public double getDataDeliveryRate() + { + return getVirtualHost().getDataDeliveryStatistics().getRate(); + } + + public long getTotalMessagesDelivered() + { + return getVirtualHost().getMessageDeliveryStatistics().getTotal(); + } + + public long getTotalDataDelivered() + { + return getVirtualHost().getDataDeliveryStatistics().getTotal(); + } + + public double getPeakMessageReceiptRate() + { + return getVirtualHost().getMessageReceiptStatistics().getPeak(); + } + + public double getPeakDataReceiptRate() + { + return getVirtualHost().getDataReceiptStatistics().getPeak(); + } + + public double getMessageReceiptRate() + { + return getVirtualHost().getMessageReceiptStatistics().getRate(); + } + + public double getDataReceiptRate() + { + return getVirtualHost().getDataReceiptStatistics().getRate(); + } + + public long getTotalMessagesReceived() + { + return getVirtualHost().getMessageReceiptStatistics().getTotal(); + } + + public long getTotalDataReceived() + { + return getVirtualHost().getDataReceiptStatistics().getTotal(); + } + + public boolean isStatisticsEnabled() + { + return getVirtualHost().isStatisticsEnabled(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 4f86c82578..dd3046cd01 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -22,6 +22,7 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQMethodBody; @@ -141,6 +142,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); + private final AtomicLong _txnUpdateTime = new AtomicLong(0); private final AMQProtocolSession _session; private AtomicBoolean _closing = new AtomicBoolean(false); @@ -200,6 +202,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel return !(_transaction instanceof AutoCommitTransaction); } + public boolean inTransaction() + { + return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; + } + private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -295,7 +302,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel }); deliverCurrentMessageIfComplete(); - } } @@ -333,11 +339,15 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional())); incrementOutstandingTxnsIfNecessary(); + updateTransactionalActivity(); } } } finally { + long bodySize = _currentMessage.getSize(); + long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().properties).getTimestamp(); + _session.registerMessageReceived(bodySize, timestamp); _currentMessage = null; } } @@ -794,6 +804,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); + updateTransactionalActivity(); } private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) @@ -968,6 +979,17 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } + /** + * Update last transaction activity timestamp + */ + private void updateTransactionalActivity() + { + if (isTransactional()) + { + _txnUpdateTime.set(System.currentTimeMillis()); + } + } + public String toString() { return "["+_session.toString()+":"+_channelId+"]"; @@ -1018,6 +1040,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag()); + _session.registerMessageDelivered(entry.getMessage().getSize()); } }; @@ -1407,4 +1430,36 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { _session.mgmtCloseChannel(_channelId); } + + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + { + if (inTransaction()) + { + long currentTime = System.currentTimeMillis(); + long openTime = currentTime - _transaction.getTransactionStartTime(); + long idleTime = currentTime - _txnUpdateTime.get(); + + // Log a warning on idle or open transactions + if (idleWarn > 0L && idleTime > idleWarn) + { + CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime)); + _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms"); + } + else if (openWarn > 0L && openTime > openWarn) + { + CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime)); + _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); + } + + // Close connection for idle or open transactions that have timed out + if (idleClose > 0L && idleTime > idleClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); + } + else if (openClose > 0L && openTime > openClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + } + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 71cf17ed60..9d3c4dd2e8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -320,8 +320,7 @@ public class Main ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean(); configMBean.register(); - ServerInformationMBean sysInfoMBean = - new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion()); + ServerInformationMBean sysInfoMBean = new ServerInformationMBean(config); sysInfoMBean.register(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 7197ec8cdc..43be0611a5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -767,6 +767,36 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa DEFAULT_HOUSEKEEPING_PERIOD)); } + public long getStatisticsSamplePeriod() + { + return getConfig().getLong("statistics.sample.period", 5000L); + } + + public boolean isStatisticsGenerationBrokerEnabled() + { + return getConfig().getBoolean("statistics.generation.broker", false); + } + + public boolean isStatisticsGenerationVirtualhostsEnabled() + { + return getConfig().getBoolean("statistics.generation.virtualhosts", false); + } + + public boolean isStatisticsGenerationConnectionsEnabled() + { + return getConfig().getBoolean("statistics.generation.connections", false); + } + + public long getStatisticsReportingPeriod() + { + return getConfig().getLong("statistics.reporting.period", 0L); + } + + public boolean isStatisticsReportResetEnabled() + { + return getConfig().getBoolean("statistics.reporting.reset", false); + } + public NetworkDriverConfiguration getNetworkConfiguration() { return new NetworkDriverConfiguration() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index d9d7083543..48f2d776bb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -313,4 +313,24 @@ public class VirtualHostConfiguration extends ConfigurationPlugin { return getIntValue("housekeeping.poolSize", Runtime.getRuntime().availableProcessors()); } + + public long getTransactionTimeoutOpenWarn() + { + return getLongValue("transactionTimeout.openWarn", 0L); + } + + public long getTransactionTimeoutOpenClose() + { + return getLongValue("transactionTimeout.openClose", 0L); + } + + public long getTransactionTimeoutIdleWarn() + { + return getLongValue("transactionTimeout.idleWarn", 0L); + } + + public long getTransactionTimeoutIdleClose() + { + return getLongValue("transactionTimeout.idleClose", 0L); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index bac751e0c8..c06305ee4e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -20,19 +20,19 @@ */ package org.apache.qpid.server.connection; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.common.Closeable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQConnectionModel; public class ConnectionRegistry implements IConnectionRegistry, Closeable { - private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>(); + private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>(); private Logger _logger = Logger.getLogger(ConnectionRegistry.class); @@ -40,44 +40,42 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable { // None required } - - public void expireClosedChannels() - { - for (AMQProtocolSession connection : _registry) - { - connection.closeIfLingeringClosedChannels(); - } - } /** Close all of the currently open connections. */ public void close() { while (!_registry.isEmpty()) { - AMQProtocolSession connection = _registry.get(0); - - try - { - connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down", - 0, 0, - connection.getProtocolOutputConverter().getProtocolMajorVersion(), - connection.getProtocolOutputConverter().getProtocolMinorVersion(), - (Throwable) null), true); - } - catch (AMQException e) - { - _logger.warn("Error closing connection:" + e.getMessage()); - } + AMQConnectionModel connection = _registry.get(0); + closeConnection(connection, AMQConstant.INTERNAL_ERROR, "Broker is shutting down"); + } + } + + public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message) + { + try + { + connection.close(cause, message); + } + catch (AMQException e) + { + _logger.warn("Error closing connection:" + e.getMessage()); } } - public void registerConnection(AMQProtocolSession connnection) + public void registerConnection(AMQConnectionModel connnection) { _registry.add(connnection); } - public void deregisterConnection(AMQProtocolSession connnection) + public void deregisterConnection(AMQConnectionModel connnection) { _registry.remove(connnection); } + + @Override + public List<AMQConnectionModel> getConnections() + { + return new ArrayList<AMQConnectionModel>(_registry); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index 002269bbaa..b4f5bffa57 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -20,18 +20,23 @@ */ package org.apache.qpid.server.connection; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import java.util.List; + import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.protocol.AMQConnectionModel; public interface IConnectionRegistry { - public void initialise(); public void close() throws AMQException; + + public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message); + + public List<AMQConnectionModel> getConnections(); - public void registerConnection(AMQProtocolSession connnection); - - public void deregisterConnection(AMQProtocolSession connnection); + public void registerConnection(AMQConnectionModel connnection); + public void deregisterConnection(AMQConnectionModel connnection); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java index 7aeff2561e..0f1b709475 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java @@ -90,12 +90,12 @@ public abstract class AbstractExchangeMBean<T extends AbstractExchange> extends public String getObjectInstanceName() { - return _exchange.getNameShortString().toString(); + return ObjectName.quote(_exchange.getName()); } public String getName() { - return _exchange.getNameShortString().toString(); + return _exchange.getName(); } public String getExchangeType() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java index db2cc970b2..5e6a143d52 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java @@ -22,9 +22,11 @@ package org.apache.qpid.server.information.management; import java.io.IOException; +import org.apache.qpid.common.QpidProperties; import org.apache.qpid.management.common.mbeans.ServerInformation; import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.registry.ApplicationRegistry; import javax.management.JMException; @@ -34,12 +36,15 @@ public class ServerInformationMBean extends AMQManagedObject implements ServerIn { private String buildVersion; private String productVersion; + private ApplicationRegistry registry; - public ServerInformationMBean(String buildVersion, String productVersion) throws JMException + public ServerInformationMBean(ApplicationRegistry applicationRegistry) throws JMException { super(ServerInformation.class, ServerInformation.TYPE); - this.buildVersion = buildVersion; - this.productVersion = productVersion; + + registry = applicationRegistry; + buildVersion = QpidProperties.getBuildVersion(); + productVersion = QpidProperties.getReleaseVersion(); } public String getObjectInstanceName() @@ -67,5 +72,75 @@ public class ServerInformationMBean extends AMQManagedObject implements ServerIn return productVersion; } + + public void resetStatistics() throws Exception + { + registry.resetStatistics(); + } + + public double getPeakMessageDeliveryRate() + { + return registry.getMessageDeliveryStatistics().getPeak(); + } + + public double getPeakDataDeliveryRate() + { + return registry.getDataDeliveryStatistics().getPeak(); + } + + public double getMessageDeliveryRate() + { + return registry.getMessageDeliveryStatistics().getRate(); + } + + public double getDataDeliveryRate() + { + return registry.getDataDeliveryStatistics().getRate(); + } + + public long getTotalMessagesDelivered() + { + return registry.getMessageDeliveryStatistics().getTotal(); + } + + public long getTotalDataDelivered() + { + return registry.getDataDeliveryStatistics().getTotal(); + } + + public double getPeakMessageReceiptRate() + { + return registry.getMessageReceiptStatistics().getPeak(); + } + + public double getPeakDataReceiptRate() + { + return registry.getDataReceiptStatistics().getPeak(); + } + + public double getMessageReceiptRate() + { + return registry.getMessageReceiptStatistics().getRate(); + } + + public double getDataReceiptRate() + { + return registry.getDataReceiptStatistics().getRate(); + } + + public long getTotalMessagesReceived() + { + return registry.getMessageReceiptStatistics().getTotal(); + } + + public long getTotalDataReceived() + { + return registry.getDataReceiptStatistics().getTotal(); + } + + public boolean isStatisticsEnabled() + { + return registry.isStatisticsEnabled(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties index 6b83a7e7a5..5d1e85fe41 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties @@ -32,4 +32,7 @@ STOPPED = BRK-1005 : Stopped # 0 - path CONFIG = BRK-1006 : Using configuration : {0} # 0 - path -LOG_CONFIG = BRK-1007 : Using logging configuration : {0}
\ No newline at end of file +LOG_CONFIG = BRK-1007 : Using logging configuration : {0} + +STATS_DATA = BRK-1008 : {0,choice,0#delivered|1#received} : {1,number,#.###} kB/s peak : {2,number,#} bytes total +STATS_MSGS = BRK-1009 : {0,choice,0#delivered|1#received} : {1,number,#.###} msg/s peak : {2,number,#} msgs total
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties index 53bcd712f2..ed8c0d0ce9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties @@ -28,3 +28,7 @@ PREFETCH_SIZE = CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number} # 0 - queue causing flow control FLOW_ENFORCED = CHN-1005 : Flow Control Enforced (Queue {0}) FLOW_REMOVED = CHN-1006 : Flow Control Removed +# Channel Transactions +# 0 - time in milliseconds +OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms +IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties index 66bbefacb0..3e640c7929 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties @@ -20,4 +20,7 @@ # # 0 - name CREATED = VHT-1001 : Created : {0} -CLOSED = VHT-1002 : Closed
\ No newline at end of file +CLOSED = VHT-1002 : Closed + +STATS_DATA = VHT-1003 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} kB/s peak : {3,number,#} bytes total +STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total`
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java index 0a739af695..7924964fdf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java @@ -153,32 +153,4 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana return ""; } - protected static StringBuffer jmxEncode(StringBuffer jmxName, int attrPos) - { - for (int i = attrPos; i < jmxName.length(); i++) - { - if (jmxName.charAt(i) == ',') - { - jmxName.setCharAt(i, ';'); - } - else if (jmxName.charAt(i) == ':') - { - jmxName.setCharAt(i, '-'); - } - else if (jmxName.charAt(i) == '?' || - jmxName.charAt(i) == '*' || - jmxName.charAt(i) == '\\') - { - jmxName.insert(i, '\\'); - i++; - } - else if (jmxName.charAt(i) == '\n') - { - jmxName.insert(i, '\\'); - i++; - jmxName.setCharAt(i, 'n'); - } - } - return jmxName; - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java index 964b5ed5a0..380f51e308 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java @@ -213,6 +213,20 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler, Notificati ObjectName object = (ObjectName) args[0]; String vhost = object.getKeyProperty("VirtualHost"); + if(vhost != null) + { + try + { + //if the name is quoted in the ObjectName, unquote it + vhost = ObjectName.unquote(vhost); + } + catch(IllegalArgumentException e) + { + //ignore, this just means the name is not quoted + //and can be left unchanged + } + } + return vhost; } return null; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index a6bab017a1..4e8d64a136 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -63,7 +63,7 @@ public class PluginManager implements Closeable private static final Logger _logger = Logger.getLogger(PluginManager.class); private static final int FELIX_STOP_TIMEOUT = 30000; - private static final String QPID_VER_SUFFIX = "version=0.9,"; + private static final String QPID_VER_SUFFIX = "version=0.11,"; private Framework _felix; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index bcda385f64..061ebf50cd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -20,14 +20,35 @@ */ package org.apache.qpid.server.protocol; -import org.apache.qpid.protocol.AMQConstant; +import java.util.List; +import java.util.UUID; + import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.stats.StatisticsGatherer; -public interface AMQConnectionModel +public interface AMQConnectionModel extends StatisticsGatherer { + /** + * get a unique id for this connection. + * + * @return a {@link UUID} representing the connection + */ + public UUID getId(); + + /** + * Close the underlying Connection + * + * @param cause + * @param message + * @throws org.apache.qpid.AMQException + */ + public void close(AMQConstant cause, String message) throws AMQException; /** * Close the given requested Session + * * @param session * @param cause * @param message @@ -36,4 +57,16 @@ public interface AMQConnectionModel public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException; public long getConnectionId(); + + /** + * Get a list of all sessions using this connection. + * + * @return a list of {@link AMQSessionModel}s + */ + public List<AMQSessionModel> getSessionModels(); + + /** + * Return a {@link LogSubject} for the connection. + */ + public LogSubject getLogSubject(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index a1ffe272fd..449f698c48 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -30,7 +30,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; @@ -92,6 +91,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.transport.NetworkDriver; @@ -172,6 +172,10 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private final UUID _id; private final ConfigStore _configStore; private long _createTime = System.currentTimeMillis(); + + private ApplicationRegistry _registry; + private boolean _statisticsEnabled = false; + private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; public ManagedObject getManagedObject() { @@ -195,9 +199,10 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol _configStore = virtualHostRegistry.getConfigStore(); _id = _configStore.createId(); - _actor.message(ConnectionMessages.OPEN(null, null, false, false)); + _registry = virtualHostRegistry.getApplicationRegistry(); + initialiseStatistics(); } private AMQProtocolSessionMBean createMBean() throws JMException @@ -1078,19 +1083,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol return (_clientVersion == null) ? null : _clientVersion.toString(); } - public void closeIfLingeringClosedChannels() - { - for (Entry<Integer, Long>id : _closingChannelsList.entrySet()) - { - if (id.getValue() + 30000 > System.currentTimeMillis()) - { - // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection - _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed"); - closeProtocolSession(); - } - } - } - public Boolean isIncoming() { return true; @@ -1263,7 +1255,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { - closeChannel((Integer)session.getID()); MethodRegistry methodRegistry = getMethodRegistry(); @@ -1274,5 +1265,97 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol 0,0); writeFrame(responseBody.generateFrame((Integer)session.getID())); - } + } + + public void close(AMQConstant cause, String message) throws AMQException + { + closeConnection(0, new AMQConnectionException(cause, message, 0, 0, + getProtocolOutputConverter().getProtocolMajorVersion(), + getProtocolOutputConverter().getProtocolMinorVersion(), + (Throwable) null), true); + } + + public List<AMQSessionModel> getSessionModels() + { + List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); + for (AMQChannel channel : getChannels()) + { + sessions.add((AMQSessionModel) channel); + } + return sessions; + } + + public LogSubject getLogSubject() + { + return _logSubject; + } + + public void registerMessageDelivered(long messageSize) + { + if (isStatisticsEnabled()) + { + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); + } + _virtualHost.registerMessageDelivered(messageSize); + } + + public void registerMessageReceived(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); + } + _virtualHost.registerMessageReceived(messageSize, timestamp); + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + public void resetStatistics() + { + _messagesDelivered.reset(); + _dataDelivered.reset(); + _messagesReceived.reset(); + _dataReceived.reset(); + } + + public void initialiseStatistics() + { + setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && + _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled()); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); + _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); + _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); + _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); + } + + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index f48a214933..c64ed4ad5a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -231,7 +231,5 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin List<AMQChannel> getChannels(); - void closeIfLingeringClosedChannels(); - void mgmtCloseChannel(int channelId); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index f4f2cab2c2..fcac78fafa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -37,25 +37,15 @@ */ package org.apache.qpid.server.protocol; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.management.common.mbeans.ManagedConnection; -import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; -import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.ManagementActor; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.ManagedObject; +import java.util.Date; +import java.util.List; import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanNotificationInfo; import javax.management.NotCompliantMBeanException; import javax.management.Notification; +import javax.management.ObjectName; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; @@ -66,8 +56,20 @@ import javax.management.openmbean.SimpleType; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; -import java.util.Date; -import java.util.List; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.ManagementActor; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; /** * This MBean class implements the management interface. In order to make more attributes, operations and notifications @@ -94,8 +96,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed super(ManagedConnection.class, ManagedConnection.TYPE); _protocolSession = amqProtocolSession; String remote = getRemoteAddress(); - remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote; - _name = jmxEncode(new StringBuffer(remote), 0).toString(); + _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote; init(); } @@ -175,7 +176,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public String getObjectInstanceName() { - return _name; + return ObjectName.quote(_name); } /** @@ -339,4 +340,78 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed _broadcaster.sendNotification(n); } -} // End of MBean class + public void resetStatistics() throws Exception + { + _protocolSession.resetStatistics(); + } + + public double getPeakMessageDeliveryRate() + { + return _protocolSession.getMessageDeliveryStatistics().getPeak(); + } + + public double getPeakDataDeliveryRate() + { + return _protocolSession.getDataDeliveryStatistics().getPeak(); + } + + public double getMessageDeliveryRate() + { + return _protocolSession.getMessageDeliveryStatistics().getRate(); + } + + public double getDataDeliveryRate() + { + return _protocolSession.getDataDeliveryStatistics().getRate(); + } + + public long getTotalMessagesDelivered() + { + return _protocolSession.getMessageDeliveryStatistics().getTotal(); + } + + public long getTotalDataDelivered() + { + return _protocolSession.getDataDeliveryStatistics().getTotal(); + } + + public double getPeakMessageReceiptRate() + { + return _protocolSession.getMessageReceiptStatistics().getPeak(); + } + + public double getPeakDataReceiptRate() + { + return _protocolSession.getDataReceiptStatistics().getPeak(); + } + + public double getMessageReceiptRate() + { + return _protocolSession.getMessageReceiptStatistics().getRate(); + } + + public double getDataReceiptRate() + { + return _protocolSession.getDataReceiptStatistics().getRate(); + } + + public long getTotalMessagesReceived() + { + return _protocolSession.getMessageReceiptStatistics().getTotal(); + } + + public long getTotalDataReceived() + { + return _protocolSession.getDataReceiptStatistics().getTotal(); + } + + public boolean isStatisticsEnabled() + { + return _protocolSession.isStatisticsEnabled(); + } + + public void setStatisticsEnabled(boolean enabled) + { + _protocolSession.setStatisticsEnabled(enabled); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index a9b2354d75..bc63403a86 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -20,15 +20,35 @@ */ package org.apache.qpid.server.protocol; +import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.LogSubject; public interface AMQSessionModel { - Object getID(); + public Object getID(); - AMQConnectionModel getConnectionModel(); + public AMQConnectionModel getConnectionModel(); - String getClientID(); + public String getClientID(); + + public void close() throws AMQException; - LogSubject getLogSubject(); + public LogSubject getLogSubject(); + + /** + * This method is called from the housekeeping thread to check the status of + * transactions on this session and react appropriately. + * + * If a transaction is open for too long or idle for too long then a warning + * is logged or the connection is closed, depending on the configuration. An open + * transaction is one that has recent activity. The transaction age is counted + * from the time the transaction was started. An idle transaction is one that + * has had no activity, such as publishing or acknowledgeing messages. + * + * @param openWarn time in milliseconds before alerting on open transaction + * @param openClose time in milliseconds before closing connection with open transaction + * @param idleWarn time in milliseconds before alerting on idle transaction + * @param idleClose time in milliseconds before closing connection with idle transaction + */ + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index b5294b6d2f..77c4e8fc23 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -43,6 +43,7 @@ import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanNotificationInfo; import javax.management.Notification; +import javax.management.ObjectName; import javax.management.OperationsException; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.ArrayType; @@ -97,7 +98,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { super(ManagedQueue.class, ManagedQueue.TYPE); _queue = queue; - _queueName = jmxEncode(new StringBuffer(queue.getNameShortString()), 0).toString(); + _queueName = queue.getName(); } public ManagedObject getParentObject() @@ -147,7 +148,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que public String getObjectInstanceName() { - return _queueName; + return ObjectName.quote(_queueName); } public String getName() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 78a642f22f..72b2a68450 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.registry; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import java.util.UUID; import org.apache.commons.configuration.ConfigurationException; @@ -41,11 +43,12 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.logging.CompositeStartupMessageLogger; import org.apache.qpid.server.logging.Log4jMessageLogger; import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.AbstractRootMessageLogger; import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.AbstractActor; import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.BrokerMessages; +import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; @@ -54,6 +57,7 @@ import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalD import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -104,6 +108,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry private ConfigStore _configStore; protected String _registryName; + + private Timer _reportingTimer; + private boolean _statisticsEnabled = false; + private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; static { @@ -294,6 +302,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { initialiseVirtualHosts(); + initialiseStatistics(); + initialiseStatisticsReporting(); } finally { @@ -320,6 +330,72 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _managedObjectRegistry = new NoopManagedObjectRegistry(); } + + public void initialiseStatisticsReporting() + { + long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms + final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled(); + final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled(); + final boolean reset = _configuration.isStatisticsReportResetEnabled(); + + /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */ + if (report > 0L && (broker || virtualhost)) + { + _reportingTimer = new Timer("Statistics-Reporting", true); + + class StatisticsReportingTask extends TimerTask + { + private final int DELIVERED = 0; + private final int RECEIVED = 1; + + public void run() + { + CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) { + public String getLogMessage() + { + return "[" + Thread.currentThread().getName() + "] "; + } + }); + + if (broker) + { + CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal())); + CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal())); + CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal())); + CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal())); + } + + if (virtualhost) + { + for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts()) + { + String name = vhost.getName(); + StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics(); + StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics(); + StatisticsCounter dataReceived = vhost.getDataReceiptStatistics(); + StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics(); + + CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal())); + CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal())); + CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); + CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal())); + } + } + + if (reset) + { + resetStatistics(); + } + + CurrentActor.remove(); + } + } + + _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(), + report / 2, + report); + } + } public static IApplicationRegistry getInstance() { @@ -369,6 +445,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _logger.info("Shutting down ApplicationRegistry:" + this); } + + //Stop Statistics Reporting + if (_reportingTimer != null) + { + _reportingTimer.cancel(); + } //Stop incoming connections unbind(); @@ -498,4 +580,76 @@ public abstract class ApplicationRegistry implements IApplicationRegistry getBroker().addVirtualHost(virtualHost); return virtualHost; } + + public void registerMessageDelivered(long messageSize) + { + if (isStatisticsEnabled()) + { + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); + } + } + + public void registerMessageReceived(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); + } + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + public void resetStatistics() + { + _messagesDelivered.reset(); + _dataDelivered.reset(); + _messagesReceived.reset(); + _dataReceived.reset(); + + for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts()) + { + vhost.resetStatistics(); + } + } + + public void initialiseStatistics() + { + setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && + getConfiguration().isStatisticsGenerationBrokerEnabled()); + + _messagesDelivered = new StatisticsCounter("messages-delivered"); + _dataDelivered = new StatisticsCounter("bytes-delivered"); + _messagesReceived = new StatisticsCounter("messages-received"); + _dataReceived = new StatisticsCounter("bytes-received"); + } + + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index 228c3b9112..0ef55097ce 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -35,11 +35,12 @@ import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -public interface IApplicationRegistry +public interface IApplicationRegistry extends StatisticsGatherer { /** * Initialise the application registry. All initialisation must be done in this method so that any components @@ -97,4 +98,6 @@ public interface IApplicationRegistry ConfigStore getConfigStore(); void setConfigStore(ConfigStore store); + + void initialiseStatisticsReporting(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java new file mode 100644 index 0000000000..b732121180 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.stats; + +import java.util.Date; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class collects statistics and counts the total, rate per second and + * peak rate per second values for the events that are registered with it. + */ +public class StatisticsCounter +{ + private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class); + + public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s + public static final boolean DISABLE_STATISTICS = Boolean.getBoolean("qpid.statistics.disable"); + + private static final String COUNTER = "counter"; + private static final AtomicLong _counterIds = new AtomicLong(0L); + + private long _peak = 0L; + private long _total = 0L; + private long _temp = 0L; + private long _last = 0L; + private long _rate = 0L; + + private long _start; + + private final long _period; + private final String _name; + + public StatisticsCounter() + { + this(COUNTER); + } + + public StatisticsCounter(String name) + { + this(name, DEFAULT_SAMPLE_PERIOD); + } + + public StatisticsCounter(String name, long period) + { + _period = period; + _name = name + "-" + + _counterIds.incrementAndGet(); + reset(); + } + + public void registerEvent() + { + registerEvent(1L); + } + + public void registerEvent(long value) + { + registerEvent(value, System.currentTimeMillis()); + } + + public void registerEvent(long value, long timestamp) + { + if (DISABLE_STATISTICS) + { + return; + } + + long thisSample = (timestamp / _period); + synchronized (this) + { + if (thisSample > _last) + { + _last = thisSample; + _rate = _temp; + _temp = 0L; + if (_rate > _peak) + { + _peak = _rate; + } + } + + _total += value; + _temp += value; + } + } + + /** + * Update the current rate and peak - may reset rate to zero if a new + * sample period has started. + */ + private void update() + { + registerEvent(0L, System.currentTimeMillis()); + } + + /** + * Reset + */ + public void reset() + { + _log.info("Resetting statistics for counter: " + _name); + _peak = 0L; + _rate = 0L; + _total = 0L; + _start = System.currentTimeMillis(); + _last = _start / _period; + } + + public double getPeak() + { + update(); + return (double) _peak / ((double) _period / 1000.0d); + } + + public double getRate() + { + update(); + return (double) _rate / ((double) _period / 1000.0d); + } + + public long getTotal() + { + return _total; + } + + public long getStart() + { + return _start; + } + + public Date getStartTime() + { + return new Date(_start); + } + + public String getName() + { + return _name; + } + + public long getPeriod() + { + return _period; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java new file mode 100644 index 0000000000..36fec4025a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.stats; + +/** + * This interface is to be implemented by any broker business object that + * wishes to gather statistics about messages delivered through it. + * + * These statistics are exposed using a separate JMX Mbean interface, which + * calls these methods to retrieve the underlying {@link StatisticsCounter}s + * and return their attributes. This interface gives a standard way for + * parts of the broker to set up and configure statistics generation. + * <p> + * When creating these objects, there should be a parent/child relationship + * between them, such that the lowest level gatherer can record staticics if + * enabled, and pass on the notification to the parent object to allow higher + * level aggregation. When resetting statistics, this works in the opposite + * direction, with higher level gatherers also resetting all of their children. + */ +public interface StatisticsGatherer +{ + /** + * Initialise the statistics gathering for this object. + * + * This method is responsible for creating any {@link StatisticsCounter} + * objects and for determining whether statistics generation should be + * enabled, by checking broker and system configuration. + * + * @see StatisticsCounter#DISABLE_STATISTICS + */ + void initialiseStatistics(); + + /** + * This method is responsible for registering the receipt of a message + * with the counters, and also for passing this notification to any parent + * {@link StatisticsGatherer}s. If statistics generation is not enabled, + * then this method should simple delegate to the parent gatherer. + * + * @param messageSize the size in bytes of the delivered message + * @param timestamp the time the message was delivered + */ + void registerMessageReceived(long messageSize, long timestamp); + + /** + * This method is responsible for registering the delivery of a message + * with the counters. Message delivery is recorded by the counter using + * the current system time, as opposed to the message timestamp. + * + * @param messageSize the size in bytes of the delivered message + * @see #registerMessageReceived(long, long) + */ + void registerMessageDelivered(long messageSize); + + /** + * Gives access to the {@link StatisticsCounter} that is used to count + * delivered message statistics. + * + * @return the {@link StatisticsCounter} that counts delivered messages + */ + StatisticsCounter getMessageDeliveryStatistics(); + + /** + * Gives access to the {@link StatisticsCounter} that is used to count + * received message statistics. + * + * @return the {@link StatisticsCounter} that counts received messages + */ + StatisticsCounter getMessageReceiptStatistics(); + + /** + * Gives access to the {@link StatisticsCounter} that is used to count + * delivered message size statistics. + * + * @return the {@link StatisticsCounter} that counts delivered bytes + */ + StatisticsCounter getDataDeliveryStatistics(); + + /** + * Gives access to the {@link StatisticsCounter} that is used to count + * received message size statistics. + * + * @return the {@link StatisticsCounter} that counts received bytes + */ + StatisticsCounter getDataReceiptStatistics(); + + /** + * Reset the counters for this, and any child {@link StatisticsGatherer}s. + */ + void resetStatistics(); + + /** + * Check if this object has statistics generation enabled. + * + * @return true if statistics generation is enabled + */ + boolean isStatisticsEnabled(); + + /** + * Enable or disable statistics generation for this object. + */ + void setStatisticsEnabled(boolean enabled); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index b36ac84cdd..a20436f029 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -97,7 +97,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private FlowCreditManager_0_10 _creditManager; - private StateListener _stateListener = new StateListener() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index d2addfde0c..75bd50e3a2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -24,6 +24,9 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*; import java.text.MessageFormat; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -35,12 +38,16 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionCloseCode; import org.apache.qpid.transport.ExecutionErrorCode; import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.Session; public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject { @@ -49,11 +56,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel, private AtomicBoolean _logClosed = new AtomicBoolean(false); private LogActor _actor = GenericActor.getInstance(this); + private ApplicationRegistry _registry; + private boolean _statisticsEnabled = false; + private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + public ServerConnection() { } + public UUID getId() + { + return _config.getId(); + } + @Override protected void invoke(Method method) { @@ -110,6 +126,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void setVirtualHost(VirtualHost virtualHost) { _virtualHost = virtualHost; + _virtualHost.getConnectionRegistry().registerConnection(this); + + initialiseStatistics(); } public void setConnectionConfig(final ConnectionConfig config) @@ -145,6 +164,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel, ((ServerSession)session).close(); } + + public LogSubject getLogSubject() + { + return (LogSubject) this; + } @Override public void received(ProtocolEvent event) @@ -215,4 +239,100 @@ public class ServerConnection extends Connection implements AMQConnectionModel, { return _actor; } + + @Override + public void close(AMQConstant cause, String message) throws AMQException + { + ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; + try + { + replyCode = ConnectionCloseCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore + } + close(replyCode, message); + getVirtualHost().getConnectionRegistry().deregisterConnection(this); + } + + @Override + public List<AMQSessionModel> getSessionModels() + { + List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); + for (Session ssn : getChannels()) + { + sessions.add((AMQSessionModel) ssn); + } + return sessions; + } + + public void registerMessageDelivered(long messageSize) + { + if (isStatisticsEnabled()) + { + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); + } + _virtualHost.registerMessageDelivered(messageSize); + } + + public void registerMessageReceived(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); + } + _virtualHost.registerMessageReceived(messageSize, timestamp); + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + public void resetStatistics() + { + _messagesDelivered.reset(); + _dataDelivered.reset(); + _messagesReceived.reset(); + _dataReceived.reset(); + } + + public void initialiseStatistics() + { + setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && + _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled()); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId()); + _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId()); + _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId()); + _dataReceived = new StatisticsCounter("data-received-" + getConnectionId()); + } + + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 2b9e92f685..174dcbfa69 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -20,26 +20,28 @@ */ package org.apache.qpid.server.transport; -import org.apache.qpid.transport.*; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.common.ClientProperties; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; + +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHost; - -import javax.security.sasl.SaslServer; -import javax.security.sasl.SaslException; -import java.util.*; +import org.apache.qpid.transport.*; public class ServerConnectionDelegate extends ServerDelegate { private String _localFQDN; private final IApplicationRegistry _appRegistry; - public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN) { this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN); @@ -138,6 +140,7 @@ public class ServerConnectionDelegate extends ServerDelegate sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'")); sconn.setState(Connection.State.CLOSING); } + } @Override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 540ad3fffd..60c94b43c0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -20,12 +20,26 @@ */ package org.apache.qpid.server.transport; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -import static org.apache.qpid.util.Serial.gt; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*; +import static org.apache.qpid.util.Serial.*; -import com.sun.security.auth.UserPrincipal; +import java.lang.ref.WeakReference; +import java.security.Principal; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -38,6 +52,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; @@ -48,8 +64,6 @@ import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageTransfer; @@ -58,24 +72,15 @@ import org.apache.qpid.transport.Range; import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.lang.ref.WeakReference; -import java.security.Principal; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicLong; +import com.sun.security.auth.UserPrincipal; public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject { + private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); + private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); private final UUID _id; @@ -111,6 +116,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); + private final AtomicLong _txnUpdateTime = new AtomicLong(0); private Principal _principal; @@ -141,7 +147,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo _connectionConfig = connConfig; _transaction = new AutoCommitTransaction(this.getMessageStore()); _principal = new UserPrincipal(connection.getAuthorizationID()); - _reference = new WeakReference(this); + _reference = new WeakReference<Session>(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); } @@ -160,8 +166,8 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues) { - - _transaction.enqueue(queues,message, new ServerTransaction.Action() + getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); + _transaction.enqueue(queues,message, new ServerTransaction.Action() { BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); @@ -189,6 +195,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo }); incrementOutstandingTxnsIfNecessary(); + updateTransactionalActivity(); } @@ -196,6 +203,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo Runnable postIdSettingAction) { invoke(xfr, postIdSettingAction); + getConnectionModel().registerMessageDelivered(xfr.getBodySize()); } public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) @@ -377,6 +385,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo entry.release(); } }); + updateTransactionalActivity(); } public Collection<Subscription_0_10> getSubscriptions() @@ -425,6 +434,11 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo // theory return !(_transaction instanceof AutoCommitTransaction); } + + public boolean inTransaction() + { + return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; + } public void selectTx() { @@ -471,6 +485,17 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo } } + /** + * Update last transaction activity timestamp + */ + public void updateTransactionalActivity() + { + if (isTransactional()) + { + _txnUpdateTime.set(System.currentTimeMillis()); + } + } + public Long getTxnStarts() { return _txnStarts.get(); @@ -606,6 +631,38 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo return (LogSubject) this; } + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + { + if (inTransaction()) + { + long currentTime = System.currentTimeMillis(); + long openTime = currentTime - _transaction.getTransactionStartTime(); + long idleTime = currentTime - _txnUpdateTime.get(); + + // Log a warning on idle or open transactions + if (idleWarn > 0L && idleTime > idleWarn) + { + CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime)); + _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms"); + } + else if (openWarn > 0L && openTime > openWarn) + { + CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime)); + _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms"); + } + + // Close connection for idle or open transactions that have timed out + if (idleClose > 0L && idleTime > idleClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); + } + else if (openClose > 0L && openTime > openClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + } + } + } + @Override public String toLogString() { @@ -617,7 +674,5 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo getVirtualHost().getName(), getChannel()) + "] "; - } - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 42a3975e24..be659c87ae 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -209,26 +209,27 @@ public class ServerSessionDelegate extends SessionDelegate } else { - if(queue.isExclusive()) { + ServerSession s = (ServerSession) session; + queue.setExclusiveOwningSession(s); if(queue.getPrincipalHolder() == null) { - queue.setPrincipalHolder((ServerSession)session); + queue.setPrincipalHolder(s); + queue.setExclusiveOwningSession(s); ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() { - public void doTask(ServerSession session) { if(queue.getPrincipalHolder() == session) { queue.setPrincipalHolder(null); + queue.setExclusiveOwningSession(null); } } }); } - } FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); @@ -369,7 +370,6 @@ public class ServerSessionDelegate extends SessionDelegate } ssn.processed(xfr); - } @Override @@ -969,10 +969,10 @@ public class ServerSessionDelegate extends SessionDelegate } - if(method.hasAutoDelete() - && method.getAutoDelete() - && method.hasExclusive() - && method.getExclusive()) + if (method.hasAutoDelete() + && method.getAutoDelete() + && method.hasExclusive() + && method.getExclusive()) { final AMQQueue q = queue; final ServerSession.Task deleteQueueTask = new ServerSession.Task() @@ -999,12 +999,12 @@ public class ServerSessionDelegate extends SessionDelegate } }); } - else if(method.getExclusive()) + if (method.hasExclusive() + && method.getExclusive()) { final AMQQueue q = queue; final ServerSession.Task removeExclusive = new ServerSession.Task() { - public void doTask(ServerSession session) { q.setPrincipalHolder(null); @@ -1012,10 +1012,10 @@ public class ServerSessionDelegate extends SessionDelegate } }; final ServerSession s = (ServerSession) session; + q.setExclusiveOwningSession(s); s.addSessionCloseTask(removeExclusive); queue.addQueueDeleteTask(new AMQQueue.Task() { - public void doTask(AMQQueue queue) throws AMQException { s.removeSessionCloseTask(removeExclusive); @@ -1029,7 +1029,7 @@ public class ServerSessionDelegate extends SessionDelegate } } } - else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session))) + else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index db781ead96..36e9d78440 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -50,6 +50,11 @@ public class AutoCommitTransaction implements ServerTransaction _transactionLog = transactionLog; } + public long getTransactionStartTime() + { + return 0L; + } + /** * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered * by the caller are executed immediately. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index a04c743be1..f9dac782a6 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -20,18 +20,23 @@ package org.apache.qpid.server.txn; * */ - import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.TransactionLog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A concrete implementation of ServerTransaction where enqueue/dequeue @@ -41,17 +46,28 @@ import org.apache.qpid.server.store.TransactionLog; */ public class LocalTransaction implements ServerTransaction { - protected static final Logger _logger = Logger.getLogger(LocalTransaction.class); + protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class); private final List<Action> _postTransactionActions = new ArrayList<Action>(); private volatile TransactionLog.Transaction _transaction; private TransactionLog _transactionLog; + private long _txnStartTime = 0L; public LocalTransaction(TransactionLog transactionLog) { _transactionLog = transactionLog; } + + public boolean inTransaction() + { + return _transaction != null; + } + + public long getTransactionStartTime() + { + return _txnStartTime; + } public void addPostTransactionAction(Action postTransactionAction) { @@ -89,7 +105,6 @@ public class LocalTransaction implements ServerTransaction try { - for(QueueEntry entry : queueEntries) { ServerMessage message = entry.getMessage(); @@ -113,7 +128,6 @@ public class LocalTransaction implements ServerTransaction _logger.error("Error during message dequeues", e); tidyUpOnError(e); } - } private void tidyUpOnError(Exception e) @@ -140,8 +154,7 @@ public class LocalTransaction implements ServerTransaction } finally { - _transaction = null; - _postTransactionActions.clear(); + resetDetails(); } } @@ -193,8 +206,25 @@ public class LocalTransaction implements ServerTransaction { _postTransactionActions.add(postTransactionAction); + if (_txnStartTime == 0L) + { + _txnStartTime = System.currentTimeMillis(); + } + if(message.isPersistent()) { + if(_transaction == null) + { + for(BaseQueue queue : queues) + { + if(queue.isDurable()) + { + beginTranIfNecessary(); + break; + } + } + } + try { for(BaseQueue queue : queues) @@ -248,17 +278,14 @@ public class LocalTransaction implements ServerTransaction } finally { - _transaction = null; - _postTransactionActions.clear(); + resetDetails(); } - } public void rollback() { try { - if(_transaction != null) { _transaction.abortTran(); @@ -280,9 +307,15 @@ public class LocalTransaction implements ServerTransaction } finally { - _transaction = null; - _postTransactionActions.clear(); + resetDetails(); } } } + + private void resetDetails() + { + _transaction = null; + _postTransactionActions.clear(); + _txnStartTime = 0L; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index b61b8a5c64..b3c6e1ac3a 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -52,6 +52,13 @@ public interface ServerTransaction public void onRollback(); } + /** + * Return the time the current transaction started. + * + * @return the time this transaction started or 0 if not in a transaction + */ + long getTransactionStartTime(); + /** * Register an Action for execution after transaction commit or rollback. Actions * will be executed in the order in which they are registered. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 4ed0507228..04f19b79bb 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,30 +20,28 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.UUID; + import org.apache.qpid.common.Closeable; +import org.apache.qpid.server.binding.BindingFactory; +import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.configuration.VirtualHostConfig; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfig; -import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.binding.BindingFactory; - -import java.util.List; -import java.util.UUID; -import java.util.TimerTask; -import java.util.concurrent.FutureTask; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TransactionLog; -public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable +public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer { IConnectionRegistry getConnectionRegistry(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 6ec1c512e5..5374a56f06 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -24,19 +24,18 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -63,6 +62,8 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; @@ -72,6 +73,7 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; @@ -111,6 +113,8 @@ public class VirtualHostImpl implements VirtualHost private BrokerConfig _broker; private UUID _id; + private boolean _statisticsEnabled = false; + private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final long _createTime = System.currentTimeMillis(); private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>(); @@ -161,12 +165,12 @@ public class VirtualHostImpl implements VirtualHost public String getObjectInstanceName() { - return _name.toString(); + return ObjectName.quote(_name); } public String getName() { - return _name.toString(); + return _name; } public VirtualHostImpl getVirtualHost() @@ -249,6 +253,8 @@ public class VirtualHostImpl implements VirtualHost _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); + + initialiseStatistics(); } private void initialiseHouseKeeping(long period) @@ -281,19 +287,30 @@ public class VirtualHostImpl implements VirtualHost // house keeping task from running. } } + for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + { + _logger.debug("Checking for long running open transactions on connection " + connection); + for (AMQSessionModel session : connection.getSessionModels()) + { + _logger.debug("Checking for long running open transactions on session " + session); + try + { + session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), + _configuration.getTransactionTimeoutOpenClose(), + _configuration.getTransactionTimeoutIdleWarn(), + _configuration.getTransactionTimeoutIdleClose()); + } + catch (Exception e) + { + _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); + } + } + } } } scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this)); - class ForceChannelClosuresTask extends TimerTask - { - public void run() - { - _connectionRegistry.expireClosedChannels(); - } - } - Map<String, VirtualHostPluginFactory> plugins = ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); @@ -627,6 +644,80 @@ public class VirtualHostImpl implements VirtualHost { return _bindingFactory; } + + public void registerMessageDelivered(long messageSize) + { + if (isStatisticsEnabled()) + { + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); + } + _appRegistry.registerMessageDelivered(messageSize); + } + + public void registerMessageReceived(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); + } + _appRegistry.registerMessageReceived(messageSize, timestamp); + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + public void resetStatistics() + { + _messagesDelivered.reset(); + _dataDelivered.reset(); + _messagesReceived.reset(); + _dataReceived.reset(); + + for (AMQConnectionModel connection : _connectionRegistry.getConnections()) + { + connection.resetStatistics(); + } + } + + public void initialiseStatistics() + { + setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && + _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled()); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); + _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); + _messagesReceived = new StatisticsCounter("messages-received-" + getName()); + _dataReceived = new StatisticsCounter("bytes-received-" + getName()); + } + + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } public void createBrokerConnection(final String transport, final String host, diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java new file mode 100644 index 0000000000..fbaa1342c9 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java @@ -0,0 +1,144 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.stats; + +import junit.framework.TestCase; + +/** + * Unit tests for the {@link StatisticsCounter} class. + */ +public class StatisticsCounterTest extends TestCase +{ + /** + * Check that statistics counters are created correctly. + */ + public void testCreate() + { + long before = System.currentTimeMillis(); + StatisticsCounter counter = new StatisticsCounter("name", 1234L); + long after = System.currentTimeMillis(); + + assertTrue(before <= counter.getStart()); + assertTrue(after >= counter.getStart()); + assertTrue(counter.getName().startsWith("name-")); + assertEquals(1234L, counter.getPeriod()); + } + + /** + * Check that totals add up correctly. + */ + public void testTotal() + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + for (int i = 0; i < 100; i++) + { + counter.registerEvent(i, start + i); + } + assertEquals(99 * 50, counter.getTotal()); // cf. Gauss + } + + /** + * Test totals add up correctly even when messages are delivered + * out-of-order. + */ + public void testTotalOutOfOrder() + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + assertEquals(0, counter.getTotal()); + counter.registerEvent(10, start + 2500); + assertEquals(10, counter.getTotal()); + counter.registerEvent(20, start + 1500); + assertEquals(30, counter.getTotal()); + counter.registerEvent(10, start + 500); + assertEquals(40, counter.getTotal()); + } + + /** + * Test that the peak rate is reported correctly. + */ + public void testPeak() throws Exception + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + assertEquals(0.0, counter.getPeak()); + Thread.sleep(500); + counter.registerEvent(1000, start + 500); + Thread.sleep(1000); + assertEquals(1000.0, counter.getPeak()); + counter.registerEvent(2000, start + 1500); + Thread.sleep(1000); + assertEquals(2000.0, counter.getPeak()); + counter.registerEvent(1000, start + 2500); + Thread.sleep(1000); + assertEquals(2000.0, counter.getPeak()); + } + + /** + * Test that peak rate is reported correctly for out-of-order messages, + * and the total is also unaffected. + */ + public void testPeakOutOfOrder() throws Exception + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + assertEquals(0.0, counter.getPeak()); + counter.registerEvent(1000, start + 2500); + Thread.sleep(1500); + assertEquals(0.0, counter.getPeak()); + counter.registerEvent(2000, start + 1500); + Thread.sleep(1000L); + assertEquals(0.0, counter.getPeak()); + counter.registerEvent(1000, start + 500); + Thread.sleep(1500); + assertEquals(4000.0, counter.getPeak()); + Thread.sleep(2000); + assertEquals(4000.0, counter.getPeak()); + counter.registerEvent(1000, start + 500); + assertEquals(4000.0, counter.getPeak()); + Thread.sleep(2000); + counter.registerEvent(1000); + assertEquals(4000.0, counter.getPeak()); + assertEquals(6000, counter.getTotal()); + } + + /** + * Test the current rate is generated correctly. + */ + public void testRate() throws Exception + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + assertEquals(0.0, counter.getRate()); + Thread.sleep(500); + counter.registerEvent(1000); + Thread.sleep(1000); + assertEquals(1000.0, counter.getRate()); + counter.registerEvent(2000); + Thread.sleep(1000); + assertEquals(2000.0, counter.getRate()); + counter.registerEvent(1000); + Thread.sleep(1000); + assertEquals(1000.0, counter.getRate()); + Thread.sleep(1000); + assertEquals(0.0, counter.getRate()); + } +} diff --git a/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java b/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java index 902b86f80b..a39799a6b6 100644 --- a/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java +++ b/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java @@ -481,7 +481,7 @@ public class GenerateLogMessages // Only check the text inside the braces '{}' int typeIndexEnd = parametersString[index].indexOf("}", typeIndex); String typeString = parametersString[index].substring(typeIndex, typeIndexEnd); - if (typeString.contains("number")) + if (typeString.contains("number") || typeString.contains("choice")) { type = "Number"; } diff --git a/qpid/java/build.xml b/qpid/java/build.xml index 9031166d76..635e72bdd5 100644 --- a/qpid/java/build.xml +++ b/qpid/java/build.xml @@ -93,6 +93,10 @@ <fail if="failed" message="TEST SUITE FAILED"/> </target> + + <target name="report-module" description="generate junitreport for modules"> + <iterate target="report-module"/> + </target> <target name="jar" description="create module jars"> <iterate target="jar"/> diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java index c4edd9034f..88ee9ed2c5 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java @@ -50,7 +50,7 @@ public class ConnectionSetup final static String QUEUE_NAME = "example.MyQueue"; public static final String TOPIC_JNDI_NAME = "topic"; - final static String TOPIC_NAME = "example.hierarical.topic"; + final static String TOPIC_NAME = "usa.news"; private Context _ctx; diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java index dd936e429f..ac3829d49e 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java @@ -71,7 +71,7 @@ public class Publisher extends Client public static void main(String[] args) { - String destination = args.length > 2 ? args[1] : null; + String destination = args.length > 2 ? args[1] : "usa.news"; int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100; diff --git a/qpid/java/client/src/main/java/client.bnd b/qpid/java/client/src/main/java/client.bnd index 0ddd163d4f..d2bc4f8d50 100755 --- a/qpid/java/client/src/main/java/client.bnd +++ b/qpid/java/client/src/main/java/client.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.9.0 +ver: 0.11.0 Bundle-SymbolicName: qpid-client Bundle-Version: ${ver} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index ee52cd50af..b31dd2bc91 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -105,6 +105,21 @@ public class AMQBrokerDetails implements BrokerDetails if (host == null) { host = ""; + + String auth = connection.getAuthority(); + if (auth != null) + { + // contains both host & port myhost:5672 + if (auth.contains(":")) + { + host = auth.substring(0,auth.indexOf(":")); + } + else + { + host = auth; + } + } + } setHost(host); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index b0bd8f8e97..891819c227 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -69,6 +69,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec org.apache.qpid.transport.Connection _qpidConnection; private ConnectionException exception = null; + static + { + // Register any configured SASL client factories. + org.apache.qpid.client.security.DynamicSaslRegistrar.registerSaslProviders(); + } + //--- constructor public AMQConnectionDelegate_0_10(AMQConnection conn) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 1f940b62f0..5c2949960c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1043,7 +1043,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throws JMSException { checkNotClosed(); - AMQTopic origTopic = checkValidTopic(topic, true); + Topic origTopic = checkValidTopic(topic, true); AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; @@ -1307,8 +1307,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createQueueReceiver(Destination destination) throws JMSException { checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - C consumer = (C) createConsumer(destination); + Queue dest = validateQueue(destination); + C consumer = (C) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); } @@ -1326,8 +1326,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - C consumer = (C) createConsumer(destination, messageSelector); + Queue dest = validateQueue(destination); + C consumer = (C) createConsumer(dest, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1344,7 +1344,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createReceiver(Queue queue) throws JMSException { checkNotClosed(); - AMQQueue dest = (AMQQueue) queue; + Queue dest = validateQueue(queue); C consumer = (C) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); @@ -1363,11 +1363,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { checkNotClosed(); - AMQQueue dest = (AMQQueue) queue; + Queue dest = validateQueue(queue); C consumer = (C) createConsumer(dest, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } + + private Queue validateQueue(Destination dest) throws InvalidDestinationException + { + if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue) + { + return (Queue)dest; + } + else + { + throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue"); + } + } public QueueSender createSender(Queue queue) throws JMSException { @@ -1408,7 +1420,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkNotClosed(); - AMQTopic dest = checkValidTopic(topic); + Topic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); @@ -1428,7 +1440,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); - AMQTopic dest = checkValidTopic(topic); + Topic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal)); @@ -2395,7 +2407,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /* * I could have combined the last 3 methods, but this way it improves readability */ - protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws JMSException + protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException { if (topic == null) { @@ -2414,17 +2426,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ("Cannot create a durable subscription with a temporary topic: " + topic); } - if (!(topic instanceof AMQTopic)) + if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic)) { throw new javax.jms.InvalidDestinationException( "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); } - return (AMQTopic) topic; + return topic; } - protected AMQTopic checkValidTopic(Topic topic) throws JMSException + protected Topic checkValidTopic(Topic topic) throws JMSException { return checkValidTopic(topic, false); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 517a7a5ce8..3f84716df7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -47,6 +47,8 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.FieldTableSupport; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; +import org.apache.qpid.client.messaging.address.Link; +import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; @@ -56,6 +58,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.ExchangeBoundResult; import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.ExecutionErrorCode; import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; @@ -600,10 +603,16 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs()); } + boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; + + if (consumer.getDestination().getLink() != null) + { + acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE; + } getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), - getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, + acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } @@ -1068,22 +1077,37 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } - public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) + public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException { boolean match = true; - QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); - match = dest.getAddressName().equals(result.getQueue()); - - if (match && assertNode) + try { - match = (result.getDurable() == node.isDurable()) && - (result.getAutoDelete() == node.isAutoDelete()) && - (result.getExclusive() == node.isExclusive()) && - (matchProps(result.getArguments(),node.getDeclareArgs())); + QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); + match = dest.getAddressName().equals(result.getQueue()); + + if (match && assertNode) + { + match = (result.getDurable() == node.isDurable()) && + (result.getAutoDelete() == node.isAutoDelete()) && + (result.getExclusive() == node.isExclusive()) && + (matchProps(result.getArguments(),node.getDeclareArgs())); + } + else if (match) + { + // should I use the queried details to update the local data structure. + } } - else if (match) + catch(SessionException e) { - // should I use the queried details to update the local data structure. + if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED) + { + match = false; + } + else + { + throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()), + "Error querying queue",e); + } } return match; @@ -1149,6 +1173,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic int type = resolveAddressType(dest); + if (type == AMQDestination.QUEUE_TYPE && + dest.getLink().getReliability() == Reliability.UNSPECIFIED) + { + dest.getLink().setReliability(Reliability.AT_LEAST_ONCE); + } + else if (type == AMQDestination.TOPIC_TYPE && + dest.getLink().getReliability() == Reliability.UNSPECIFIED) + { + dest.getLink().setReliability(Reliability.UNRELIABLE); + } + else if (type == AMQDestination.TOPIC_TYPE && + dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE) + { + throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics"); + } + switch (type) { case AMQDestination.QUEUE_TYPE: diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 6217cb534a..c573da9def 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import java.net.URISyntaxException; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Topic; @@ -95,39 +96,47 @@ public class AMQTopic extends AMQDestination implements Topic super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys); } - public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) + public static AMQTopic createDurableTopic(Topic topic, String subscriptionName, AMQConnection connection) throws JMSException { - if (topic.getDestSyntax() == DestSyntax.ADDR) + if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic) { - try + AMQDestination qpidTopic = (AMQDestination)topic; + if (qpidTopic.getDestSyntax() == DestSyntax.ADDR) { - AMQTopic t = new AMQTopic(topic.getAddress()); - AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); - // link is never null if dest was created using an address string. - t.getLink().setName(queueName.asString()); - t.getSourceNode().setAutoDelete(false); - t.getSourceNode().setDurable(true); - - // The legacy fields are also populated just in case. - t.setQueueName(queueName); - t.setAutoDelete(false); - t.setDurable(true); - return t; + try + { + AMQTopic t = new AMQTopic(qpidTopic.getAddress()); + AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); + // link is never null if dest was created using an address string. + t.getLink().setName(queueName.asString()); + t.getSourceNode().setAutoDelete(false); + t.getSourceNode().setDurable(true); + + // The legacy fields are also populated just in case. + t.setQueueName(queueName); + t.setAutoDelete(false); + t.setDurable(true); + return t; + } + catch(Exception e) + { + JMSException ex = new JMSException("Error creating durable topic"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } } - catch(Exception e) + else { - JMSException ex = new JMSException("Error creating durable topic"); - ex.initCause(e); - ex.setLinkedException(e); - throw ex; + return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false, + getDurableTopicQueueName(subscriptionName, connection), + true); } } else { - return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false, - getDurableTopicQueueName(subscriptionName, connection), - true); + throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Topic"); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 0a78403268..5d32863f2f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -571,6 +571,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa if (!_session.isClosed() || _session.isClosing()) { sendCancel(); + cleanupQueue(); } } catch (AMQException e) @@ -608,6 +609,8 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } abstract void sendCancel() throws AMQException, FailoverException; + + abstract void cleanupQueue() throws AMQException, FailoverException; /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index b5f3501e5a..964c238946 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -19,8 +19,11 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; +import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -509,4 +512,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return _exclusive; } } + + void cleanupQueue() throws AMQException, FailoverException + { + AMQDestination dest = this.getDestination(); + if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getDelete() == AddressOption.ALWAYS || + dest.getDelete() == AddressOption.RECEIVER ) + { + ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( + this.getDestination().getQueueName()); + } + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index cdbf57769d..00acd5e866 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -88,4 +88,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe return receive(); } + void cleanupQueue() throws AMQException, FailoverException + { + + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 53c0457120..36f2dfb66f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -19,6 +19,7 @@ package org.apache.qpid.client; import static org.apache.qpid.transport.Option.NONE; import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.UNRELIABLE; import java.nio.ByteBuffer; import java.util.HashMap; @@ -30,9 +31,12 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.messaging.address.Link.Reliability; +import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -210,6 +214,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer deliveryMode == DeliveryMode.PERSISTENT) ); + boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) && + (destination.getLink().getReliability() == Reliability.UNRELIABLE); + org.apache.mina.common.ByteBuffer data = message.getData(); ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); @@ -217,7 +224,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProp, messageProps), - buffer, sync ? SYNC : NONE); + buffer, sync ? SYNC : NONE, unreliable ? UNRELIABLE : NONE); if (sync) { ssn.sync(); @@ -239,5 +246,21 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { return _session.isQueueBound(destination); } + + @Override + public void close() + { + super.close(); + AMQDestination dest = _destination; + if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getDelete() == AddressOption.ALWAYS || + dest.getDelete() == AddressOption.SENDER ) + { + ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( + _destination.getQueueName()); + } + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index 27783bcacf..295c6a4091 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -50,25 +50,25 @@ public class QueueSenderAdapter implements QueueSender public void send(Message msg) throws JMSException { - checkPreConditions(); + checkQueuePreConditions(_queue); _delegate.send(msg); } public void send(Queue queue, Message msg) throws JMSException { - checkPreConditions(queue); + checkQueuePreConditions(queue); _delegate.send(queue, msg); } public void publish(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkPreConditions(); + checkQueuePreConditions(_queue); _delegate.send(msg, deliveryMode, priority, timeToLive); } public void send(Queue queue, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkPreConditions(queue); + checkQueuePreConditions(queue); _delegate.send(queue, msg, deliveryMode, priority, timeToLive); } @@ -122,19 +122,19 @@ public class QueueSenderAdapter implements QueueSender public void send(Destination dest, Message msg) throws JMSException { - checkPreConditions((Queue) dest); + checkQueuePreConditions((Queue) dest); _delegate.send(dest, msg); } public void send(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkPreConditions(); + checkQueuePreConditions(_queue); _delegate.send(msg, deliveryMode, priority, timeToLive); } public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkPreConditions((Queue) dest); + checkQueuePreConditions((Queue) dest); _delegate.send(dest, msg, deliveryMode, priority, timeToLive); } @@ -170,11 +170,6 @@ public class QueueSenderAdapter implements QueueSender private void checkPreConditions() throws JMSException { - checkPreConditions(_queue); - } - - private void checkPreConditions(Queue queue) throws JMSException - { if (closed) { throw new javax.jms.IllegalStateException("Publisher is closed"); @@ -186,39 +181,43 @@ public class QueueSenderAdapter implements QueueSender { throw new javax.jms.IllegalStateException("Invalid Session"); } + } - if (queue == null) - { - throw new UnsupportedOperationException("Queue is null."); - } - - if (!(queue instanceof AMQDestination)) - { - throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue"); - } - - AMQDestination destination = (AMQDestination) queue; - if (!destination.isCheckedForQueueBinding() && checkQueueBeforePublish()) - { - - if (_delegate.getSession().isStrictAMQP()) - { - _delegate._logger.warn("AMQP does not support destination validation before publish, "); - destination.setCheckedForQueueBinding(true); - } - else - { - if (_delegate.isBound(destination)) - { - destination.setCheckedForQueueBinding(true); - } - else - { - throw new InvalidDestinationException("Queue: " + queue - + " is not a valid destination (no bindings on server"); - } - } - } + private void checkQueuePreConditions(Queue queue) throws JMSException + { + checkPreConditions() ; + + if (queue == null) + { + throw new UnsupportedOperationException("Queue is null."); + } + + if (!(queue instanceof AMQDestination)) + { + throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue"); + } + + AMQDestination destination = (AMQDestination) queue; + if (!destination.isCheckedForQueueBinding() && checkQueueBeforePublish()) + { + if (_delegate.getSession().isStrictAMQP()) + { + _delegate._logger.warn("AMQP does not support destination validation before publish, "); + destination.setCheckedForQueueBinding(true); + } + else + { + if (_delegate.isBound(destination)) + { + destination.setCheckedForQueueBinding(true); + } + else + { + throw new InvalidDestinationException("Queue: " + queue + + " is not a valid destination (no bindings on server"); + } + } + } } private boolean checkQueueBeforePublish() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 92e61984d2..fb7b191656 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -22,10 +22,12 @@ package org.apache.qpid.client.message; import java.lang.ref.SoftReference; +import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -670,7 +672,19 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate public Enumeration getPropertyNames() throws JMSException { - return java.util.Collections.enumeration(getApplicationHeaders().keySet()); + List<String> props = new ArrayList<String>(); + Map<String, Object> propertyMap = getApplicationHeaders(); + for (String prop: getApplicationHeaders().keySet()) + { + Object value = propertyMap.get(prop); + if (value instanceof Boolean || value instanceof Number + || value instanceof String) + { + props.add(prop); + } + } + + return java.util.Collections.enumeration(props); } public void setBooleanProperty(String propertyName, boolean b) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java index 6e22292ee0..58f108f1a4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.message; import java.util.List; import java.util.Map; +import java.util.UUID; import javax.jms.JMSException; import javax.jms.MessageFormatException; @@ -65,7 +66,7 @@ public class AMQPEncodedMapMessage extends JMSMapMessage if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer) || (value instanceof Long) || (value instanceof Character) || (value instanceof Float) || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) - || (value instanceof List) || (value instanceof Map) || (value == null)) + || (value instanceof List) || (value instanceof Map) || (value instanceof UUID) || (value == null)) { _map.put(propName, value); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 00503cc650..e454a8eee4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Link.Subscription; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; @@ -262,7 +263,7 @@ public class AddressHelper } } - public Link getLink() + public Link getLink() throws Exception { Link link = new Link(); link.setSubscription(new Subscription()); @@ -272,6 +273,25 @@ public class AddressHelper : linkProps.getBoolean(DURABLE)); link.setName(linkProps.getString(NAME)); + String reliability = linkProps.getString(RELIABILITY); + if ( reliability != null) + { + if (reliability.equalsIgnoreCase("unreliable")) + { + link.setReliability(Reliability.UNRELIABLE); + } + else if (reliability.equalsIgnoreCase("at-least-once")) + { + link.setReliability(Reliability.AT_LEAST_ONCE); + } + else + { + throw new Exception("The reliability mode '" + + reliability + "' is not yet supported"); + } + + } + if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) { MapAccessor capacityProps = new MapAccessor( diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index a7d19d1bd5..5f97d625b4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client.messaging.address; +import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED; + import java.util.HashMap; import java.util.Map; @@ -29,6 +31,8 @@ public class Link { public enum FilterType { SQL92, XQUERY, SUBJECT } + public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED } + protected String name; protected String _filter; protected FilterType _filterType = FilterType.SUBJECT; @@ -38,7 +42,18 @@ public class Link protected int _producerCapacity = 0; protected Node node; protected Subscription subscription; + protected Reliability reliability = UNSPECIFIED; + public Reliability getReliability() + { + return reliability; + } + + public void setReliability(Reliability reliability) + { + this.reliability = reliability; + } + public Node getNode() { return node; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties index 1bff43142b..b903208927 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties @@ -18,3 +18,4 @@ # AMQPLAIN=org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory CRAM-MD5-HASHED=org.apache.qpid.client.security.crammd5hashed.CRAMMD5HashedSaslClientFactory +ANONYMOUS=org.apache.qpid.client.security.anonymous.AnonymousSaslClientFactory diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClient.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClient.java new file mode 100644 index 0000000000..0f56b2ef6c --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClient.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.security.anonymous; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +public class AnonymousSaslClient implements SaslClient +{ + public String getMechanismName() { + return "ANONYMOUS"; + } + public boolean hasInitialResponse() { + return true; + } + public byte[] evaluateChallenge(byte[] challenge) throws SaslException { + return new byte[0]; + } + public boolean isComplete() { + return true; + } + public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException + { + throw new IllegalStateException("No security layer supported"); + } + public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException + { + throw new IllegalStateException("No security layer supported"); + } + public Object getNegotiatedProperty(String propName) { + return null; + } + public void dispose() throws SaslException {} +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClientFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClientFactory.java new file mode 100644 index 0000000000..de698f87c6 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClientFactory.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.security.anonymous; + +import java.util.Arrays; +import java.util.Map; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslClientFactory; +import javax.security.sasl.SaslException; +import javax.security.auth.callback.CallbackHandler; + +public class AnonymousSaslClientFactory implements SaslClientFactory +{ + public SaslClient createSaslClient(String[] mechanisms, String authId, + String protocol, String server, + Map props, CallbackHandler cbh) throws SaslException + { + if (Arrays.asList(mechanisms).contains("ANONYMOUS")) { + return new AnonymousSaslClient(); + } else { + return null; + } + } + public String[] getMechanismNames(Map props) + { + if (props == null || props.isEmpty()) { + return new String[]{"ANONYMOUS"}; + } else { + return new String[0]; + } + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 2be3720c20..2c5fa0112e 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -549,6 +549,37 @@ public class ConnectionURLTest extends TestCase assertTrue("String representation should contain options and values", url.toString().contains("maxprefetch='12345'")); } + public void testHostNamesWithUnderScore() throws URLSyntaxException + { + String url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score:6672'"; + + ConnectionURL connectionurl = new AMQConnectionURL(url); + + assertTrue(connectionurl.getUsername().equals("guest")); + assertTrue(connectionurl.getPassword().equals("guest")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); + + assertTrue(connectionurl.getBrokerCount() == 1); + BrokerDetails service = connectionurl.getBrokerDetails(0); + assertTrue(service.getTransport().equals("tcp")); + assertTrue(service.getHost().equals("under_score")); + assertTrue(service.getPort() == 6672); + + url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score'"; + + connectionurl = new AMQConnectionURL(url); + + assertTrue(connectionurl.getUsername().equals("guest")); + assertTrue(connectionurl.getPassword().equals("guest")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); + + assertTrue(connectionurl.getBrokerCount() == 1); + service = connectionurl.getBrokerDetails(0); + assertTrue(service.getTransport().equals("tcp")); + assertTrue(service.getHost().equals("under_score")); + assertTrue(service.getPort() == 5672); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(ConnectionURLTest.class); diff --git a/qpid/java/common.xml b/qpid/java/common.xml index 066859b29f..a8d9c0dea4 100644 --- a/qpid/java/common.xml +++ b/qpid/java/common.xml @@ -23,7 +23,7 @@ <dirname property="project.root" file="${ant.file.common}"/> <property name="project.name" value="qpid"/> - <property name="project.version" value="0.9"/> + <property name="project.version" value="0.11"/> <property name="project.namever" value="${project.name}-${project.version}"/> <property name="resources" location="${project.root}/resources"/> diff --git a/qpid/java/common/src/main/java/common.bnd b/qpid/java/common/src/main/java/common.bnd index ef56ecec9e..89c397f400 100755 --- a/qpid/java/common/src/main/java/common.bnd +++ b/qpid/java/common/src/main/java/common.bnd @@ -17,7 +17,7 @@ # under the License.
#
-ver: 0.9.0
+ver: 0.11.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index e5e10c0e07..dc32569ee8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -28,6 +28,7 @@ import static org.apache.qpid.transport.Connection.State.OPENING; import static org.apache.qpid.transport.Connection.State.RESUMING; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -695,4 +696,8 @@ public class Connection extends ConnectionInvoker return connectionLost.get(); } + protected Collection<Session> getChannels() + { + return channels.values(); + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 214d4534c1..d961507382 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -422,7 +422,10 @@ public class Session extends SessionInvoker { return; } - sessionCompleted(copy, options); + if (copy.size() > 0) + { + sessionCompleted(copy, options); + } } } @@ -661,7 +664,12 @@ public class Session extends SessionInvoker { sessionCommandPoint(0, 0); } - if ((!closing && !transacted && m instanceof MessageTransfer) || m.hasCompletionListener()) + + boolean replayTransfer = !closing && !transacted && + m instanceof MessageTransfer && + ! m.isUnreliable(); + + if ((replayTransfer) || m.hasCompletionListener()) { commands[mod(next, commands.length)] = m; commandBytes += m.getBodySize(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java index 908d14a307..0ccfcfcb70 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java @@ -63,6 +63,7 @@ abstract class AbstractEncoder implements Encoder ENCODINGS.put(Double.class, Type.DOUBLE); ENCODINGS.put(Character.class, Type.CHAR); ENCODINGS.put(byte[].class, Type.VBIN32); + ENCODINGS.put(UUID.class, Type.UUID); } private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>() diff --git a/qpid/java/management/common/src/main/java/management-common.bnd b/qpid/java/management/common/src/main/java/management-common.bnd index cb28d309a6..66eb9f156b 100644 --- a/qpid/java/management/common/src/main/java/management-common.bnd +++ b/qpid/java/management/common/src/main/java/management-common.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.9.0 +ver: 0.11.0 Bundle-SymbolicName: qpid-management-common Bundle-Version: ${ver} diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java index c0e7293611..b48ad3f856 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java @@ -36,8 +36,8 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParame * The ManagedBroker is the management interface to expose management * features of the Broker. * - * @author Bhupendra Bhardwaj - * @version 0.1 + * @version Qpid JMX API 2.1 + * @since Qpid JMX API 1.3 */ public interface ManagedBroker { @@ -131,4 +131,118 @@ public interface ManagedBroker impact= MBeanOperationInfo.ACTION) void deleteQueue(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue Name")String queueName) throws IOException, JMException, MBeanException; + + /** + * Resets all message and data statistics for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanOperation(name="resetStatistics", + description="Resets all message and data statistics for the virtual host", + impact= MBeanOperationInfo.ACTION) + void resetStatistics() throws Exception; + + /** + * Peak rate of messages delivered per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakMessageDeliveryRate", description=TYPE + " Peak Message Delivery Rate") + double getPeakMessageDeliveryRate(); + + /** + * Peak rate of bytes delivered per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakDataDeliveryRate", description=TYPE + " Peak Data Delivery Rate") + double getPeakDataDeliveryRate(); + + /** + * Rate of messages delivered per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="MessageDeliveryRate", description=TYPE + " Message Delivery Rate") + double getMessageDeliveryRate(); + + /** + * Rate of bytes delivered per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="DataDeliveryRate", description=TYPE + " Data Delivery Rate") + double getDataDeliveryRate(); + + /** + * Total count of messages delivered for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalMessagesDelivered", description=TYPE + " Total Messages Delivered") + long getTotalMessagesDelivered(); + + /** + * Total count of bytes for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalDataDelivered", description=TYPE + " Total Data Delivered") + long getTotalDataDelivered(); + + /** + * Peak rate of messages received per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakMessageReceiptRate", description=TYPE + " Peak Message Receipt Rate") + double getPeakMessageReceiptRate(); + + /** + * Peak rate of bytes received per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakDataReceiptRate", description=TYPE + " Peak Data Receipt Rate") + double getPeakDataReceiptRate(); + + /** + * Rate of messages received per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="MessageReceiptRate", description=TYPE + " Message Receipt Rate") + double getMessageReceiptRate(); + + /** + * Rate of bytes received per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="DataReceiptRate", description=TYPE + " Data Receipt Rate") + double getDataReceiptRate(); + + /** + * Total count of messages received for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalMessagesReceived", description=TYPE + " Total Messages Received") + long getTotalMessagesReceived(); + + /** + * Total count of bytes received for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalDataReceived", description=TYPE + " Total Data Received") + long getTotalDataReceived(); + + /** + * Is statistics collection enabled for this connection. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="StatisticsEnabled", description=TYPE + " Statistics Enabled") + boolean isStatisticsEnabled(); } diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java index d6b79d1dde..6ef0fd5b19 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java @@ -37,8 +37,9 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParame /** * The management interface exposed to allow management of Connections. - * @author Bhupendra Bhardwaj - * @version 0.1 + * + * @version Qpid JMX API 2.1 + * @since Qpid JMX API 1.3 */ public interface ManagedConnection { @@ -145,4 +146,120 @@ public interface ManagedConnection description="Closes this connection and all related channels", impact= MBeanOperationInfo.ACTION) void closeConnection() throws Exception; + + /** + * Resets message and data statistics for this connection. + * + * @since Qpid JMX API 2.1 + */ + @MBeanOperation(name="resetStatistics", + description="Resets message and data statistics for this connection", + impact= MBeanOperationInfo.ACTION) + void resetStatistics() throws Exception; + + /** + * Peak rate of messages delivered per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakMessageDeliveryRate", description=TYPE + " Peak Message Delivery Rate") + double getPeakMessageDeliveryRate(); + + /** + * Peak rate of bytes delivered per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakDataDeliveryRate", description=TYPE + " Peak Data Delivery Rate") + double getPeakDataDeliveryRate(); + + /** + * Rate of messages delivered per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="MessageDeliveryRate", description=TYPE + " Message Delivery Rate") + double getMessageDeliveryRate(); + + /** + * Rate of bytes delivered per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="DataDeliveryRate", description=TYPE + " Data Delivery Rate") + double getDataDeliveryRate(); + + /** + * Total count of messages delivered for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalMessagesDelivered", description=TYPE + " Total Messages Delivered") + long getTotalMessagesDelivered(); + + /** + * Total count of bytes for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalDataDelivered", description=TYPE + " Total Data Delivered") + long getTotalDataDelivered(); + + /** + * Peak rate of messages received per second for this connection. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakMessageReceiptRate", description=TYPE + " Peak Message Receipt Rate") + double getPeakMessageReceiptRate(); + + /** + * Peak rate of bytes received per second for this connection. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakDataReceiptRate", description=TYPE + " Peak Data Receipt Rate") + double getPeakDataReceiptRate(); + + /** + * Rate of messages received per second for this connection. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="MessageReceiptRate", description=TYPE + " Message Receipt Rate") + double getMessageReceiptRate(); + + /** + * Rate of bytes received per second for this connection. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="DataReceiptRate", description=TYPE + " Data Receipt Rate") + double getDataReceiptRate(); + + /** + * Total count of messages received for this connection. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalMessagesReceived", description=TYPE + " Total Messages Received") + long getTotalMessagesReceived(); + + /** + * Total count of bytes received for this connection. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalDataReceived", description=TYPE + " Total Data Received") + long getTotalDataReceived(); + + /** + * Is statistics collection enabled for this connection. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="StatisticsEnabled", description=TYPE + " Statistics Enabled") + boolean isStatisticsEnabled(); + + void setStatisticsEnabled(boolean enabled); } diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java index 618403fdca..abee6a745e 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java @@ -22,10 +22,15 @@ package org.apache.qpid.management.common.mbeans; import java.io.IOException; +import javax.management.MBeanOperationInfo; + import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; /** * Interface for the ServerInformation MBean + * + * @version Qpid JMX API 2.1 * @since Qpid JMX API 1.3 */ public interface ServerInformation @@ -42,7 +47,7 @@ public interface ServerInformation * Qpid JMX API 1.1 can be assumed. */ int QPID_JMX_API_MAJOR_VERSION = 2; - int QPID_JMX_API_MINOR_VERSION = 0; + int QPID_JMX_API_MINOR_VERSION = 1; /** @@ -80,4 +85,118 @@ public interface ServerInformation @MBeanAttribute(name="ProductVersion", description = "The product version string") String getProductVersion() throws IOException; + + /** + * Resets all message and data statistics for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanOperation(name="resetStatistics", + description="Resets all message and data statistics for the broker", + impact= MBeanOperationInfo.ACTION) + void resetStatistics() throws Exception; + + /** + * Peak rate of messages delivered per second for the virtual host. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakMessageDeliveryRate", description=TYPE + " Peak Message Delivery Rate") + double getPeakMessageDeliveryRate(); + + /** + * Peak rate of bytes delivered per second for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakDataDeliveryRate", description=TYPE + " Peak Data Delivery Rate") + double getPeakDataDeliveryRate(); + + /** + * Rate of messages delivered per second for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="MessageDeliveryRate", description=TYPE + " Message Delivery Rate") + double getMessageDeliveryRate(); + + /** + * Rate of bytes delivered per second for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="DataDeliveryRate", description=TYPE + " Data Delivery Rate") + double getDataDeliveryRate(); + + /** + * Total count of messages delivered for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalMessagesDelivered", description=TYPE + " Total Messages Delivered") + long getTotalMessagesDelivered(); + + /** + * Total count of bytes for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalDataDelivered", description=TYPE + " Total Data Delivered") + long getTotalDataDelivered(); + + /** + * Peak rate of messages received per second for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakMessageReceiptRate", description=TYPE + " Peak Message Receipt Rate") + double getPeakMessageReceiptRate(); + + /** + * Peak rate of bytes received per second for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="PeakDataReceiptRate", description=TYPE + " Peak Data Receipt Rate") + double getPeakDataReceiptRate(); + + /** + * Rate of messages received per second for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="MessageReceiptRate", description=TYPE + " Message Receipt Rate") + double getMessageReceiptRate(); + + /** + * Rate of bytes received per second for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="DataReceiptRate", description=TYPE + " Data Receipt Rate") + double getDataReceiptRate(); + + /** + * Total count of messages received for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalMessagesReceived", description=TYPE + " Total Messages Received") + long getTotalMessagesReceived(); + + /** + * Total count of bytes received for the broker. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="TotalDataReceived", description=TYPE + " Total Data Received") + long getTotalDataReceived(); + + /** + * Is statistics collection enabled for this connection. + * + * @since Qpid JMX API 2.1 + */ + @MBeanAttribute(name="StatisticsEnabled", description=TYPE + " Statistics Enabled") + boolean isStatisticsEnabled(); } diff --git a/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF b/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF index 124fe1e767..2164c5d326 100644 --- a/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF +++ b/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF @@ -3,7 +3,7 @@ Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt Bundle-ManifestVersion: 2 Bundle-Name: Qpid JMX Management Console Plug-in Bundle-SymbolicName: org.apache.qpid.management.ui; singleton:=true -Bundle-Version: 0.9.0 +Bundle-Version: 0.11.0 Bundle-Activator: org.apache.qpid.management.ui.Activator Bundle-Vendor: Apache Software Foundation Bundle-Localization: plugin diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java index 20cfec3758..8a855a6b3c 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java @@ -47,7 +47,7 @@ public abstract class ApplicationRegistry //max supported broker management interface supported by this release of the management console public static final int SUPPORTED_QPID_JMX_API_MAJOR_VERSION = 2; - public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 0; + public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 1; public static final String DATA_DIR = System.getProperty("user.home") + File.separator + ".qpidmc"; diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXManagedObject.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXManagedObject.java index 3561e16098..a8fb864cf6 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXManagedObject.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXManagedObject.java @@ -21,6 +21,7 @@ package org.apache.qpid.management.ui.jmx; import java.util.HashMap; +import java.util.Map; import javax.management.ObjectName; @@ -31,14 +32,36 @@ public class JMXManagedObject extends ManagedBean { private ObjectName _objName; - @SuppressWarnings("unchecked") public JMXManagedObject(ObjectName objName) { super(); this._objName = objName; setUniqueName(_objName.toString()); - setDomain(_objName.getDomain()); - super.setProperties(new HashMap(_objName.getKeyPropertyList())); + setDomain(_objName.getDomain()); + + HashMap<String,String> props = new HashMap<String,String>(_objName.getKeyPropertyList()); + + for(Map.Entry<String,String> entry : props.entrySet()) + { + String value = entry.getValue(); + + if(value != null) + { + try + { + //if the name is quoted in the ObjectName, unquote it + value = ObjectName.unquote(value); + entry.setValue(value); + } + catch(IllegalArgumentException e) + { + //ignore, this just means the name is not quoted + //and can be left unchanged + } + } + } + + super.setProperties(props); } public ObjectName getObjectName() diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/NotificationObject.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/NotificationObject.java index e42b3c53b6..35cc9f6e27 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/NotificationObject.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/NotificationObject.java @@ -61,7 +61,7 @@ public class NotificationObject { if (_source instanceof ObjectName) { - return ((ObjectName)_source).getKeyProperty("name"); + return unquote(((ObjectName)_source).getKeyProperty("name")); } return null; @@ -71,12 +71,31 @@ public class NotificationObject { if (_source instanceof ObjectName) { - return ((ObjectName)_source).getKeyProperty(VIRTUAL_HOST); + return unquote(((ObjectName)_source).getKeyProperty(VIRTUAL_HOST)); } return null; } - + + private String unquote(String value) + { + if(value != null) + { + try + { + //if the value is quoted in the ObjectName, unquote it + value = ObjectName.unquote(value); + } + catch(IllegalArgumentException e) + { + //ignore, this just means the value is not quoted + //and can be left unchanged + } + } + + return value; + } + public String getMessage() { return _message; diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTabFolderFactory.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTabFolderFactory.java index 1fef89d6b5..527fc67be3 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTabFolderFactory.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTabFolderFactory.java @@ -131,6 +131,8 @@ public class MBeanTabFolderFactory } break; case VHOST_MANAGER: + createAttributesTab(tabFolder, mbean); + tab = new TabItem(tabFolder, SWT.NONE); tab.setText("Operations"); controller = new VHostTabControl(tabFolder, mbean, mbsc); diff --git a/qpid/java/module.xml b/qpid/java/module.xml index d3954a1544..80f577b018 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -401,6 +401,15 @@ </target> + <target name="report-module" description="generate junit reports for each module"> + <junitreport todir="${module.results}"> + <fileset dir="${module.results}"> + <include name="TEST-*.xml"/> + </fileset> + <report format="frames" todir="${module.results}/report/html"/> + </junitreport> + </target> + <target name="touch-failed" if="test.failures"> <touch file="${module.failed}"/> <touch file="${build.failed}"/> diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java index 004ce5ea8f..bf96dae02e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java @@ -27,6 +27,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import javax.jms.Connection; import javax.jms.JMSException; @@ -51,6 +52,7 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase private Session _session; MessageConsumer _consumer; MessageProducer _producer; + UUID myUUID = UUID.randomUUID(); public void setUp() throws Exception { @@ -119,7 +121,8 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase m.setFloat("Float", Integer.MAX_VALUE + 5000); m.setInt("Int", Integer.MAX_VALUE - 5000); m.setShort("Short", (short)58); - m.setString("String", "Hello"); + m.setString("String", "Hello"); + m.setObject("uuid", myUUID); _producer.send(m); AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT); @@ -140,6 +143,7 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase assertEquals(Integer.MAX_VALUE - 5000,m.getInt("Int")); assertEquals((short)58,m.getShort("Short")); assertEquals("Hello",m.getString("String")); + assertEquals(myUUID,(UUID)m.getObject("uuid")); } @@ -149,7 +153,11 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase List<Integer> myList = getList(); - m.setObject("List", myList); + m.setObject("List", myList); + + List<UUID> uuidList = new ArrayList<UUID>(); + uuidList.add(myUUID); + m.setObject("uuid-list", uuidList); _producer.send(m); AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT); @@ -167,6 +175,10 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase assertEquals(i,j.intValue()); i++; } + + List<UUID> list2 = (List<UUID>)msg.getObject("uuid-list"); + assertNotNull("UUID List not received",list2); + assertEquals(myUUID,list2.get(0)); } public void testMessageWithMapEntries() throws JMSException @@ -174,8 +186,12 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase MapMessage m = _session.createMapMessage(); Map<String,String> myMap = getMap(); + m.setObject("Map", myMap); + + Map<String,UUID> uuidMap = new HashMap<String,UUID>(); + uuidMap.put("uuid", myUUID); + m.setObject("uuid-map", uuidMap); - m.setObject("Map", myMap); _producer.send(m); AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT); @@ -191,6 +207,10 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase assertEquals("String" + i,map.get("Key" + i)); i++; } + + Map<String,UUID> map2 = (Map<String,UUID>)msg.getObject("uuid-map"); + assertNotNull("Map not received",map2); + assertEquals(myUUID,map2.get("uuid")); } public void testMessageWithNestedListsAndMaps() throws JMSException diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java new file mode 100644 index 0000000000..9839c6e475 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; + +/** + * Test enabling generation of message statistics on a per-connection basis. + */ +public class MessageConnectionStatisticsTest extends MessageStatisticsTestCase +{ + public void configureStatistics() throws Exception + { + // no statistics generation configured + } + + /** + * Test statistics on a single connection + */ + public void testEnablingStatisticsPerConnection() throws Exception + { + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + List<String> addresses = new ArrayList<String>(); + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); + assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); + + addresses.add(mc.getRemoteAddress()); + } + assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); + + Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); + test.start(); + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + if (addresses.contains(mc.getRemoteAddress())) + { + continue; + } + mc.setStatisticsEnabled(true); + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); + } + + sendUsing(test, 5, 200); + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + if (addresses.contains(mc.getRemoteAddress())) + { + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); + assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); + } + else + { + assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); + assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); + } + } + assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); + assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); + + test.close(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java new file mode 100644 index 0000000000..df8c6e74cd --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java @@ -0,0 +1,177 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; + +/** + * Test enabling generation of message statistics on a per-connection basis. + */ +public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCase +{ + public void configureStatistics() throws Exception + { + setConfigurationProperty("statistics.generation.broker", Boolean.toString(getName().contains("Broker"))); + setConfigurationProperty("statistics.generation.virtualhosts", Boolean.toString(getName().contains("Virtualhost"))); + setConfigurationProperty("statistics.generation.connections", Boolean.toString(getName().contains("Connection"))); + } + + /** + * Just broker statistics. + */ + public void testGenerateBrokerStatistics() throws Exception + { + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); + assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); + } + + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); + assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); + assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); + } + } + + /** + * Just virtualhost statistics. + */ + public void testGenerateVirtualhostStatistics() throws Exception + { + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); + assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); + } + + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); + assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived()); + assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); + } + } + + /** + * Just connection statistics. + */ + public void testGenerateConnectionStatistics() throws Exception + { + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); + assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); + } + + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); + assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived()); + assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); + } + } + + /** + * Both broker and virtualhost statistics. + */ + public void testGenerateBrokerAndVirtualhostStatistics() throws Exception + { + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); + assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); + } + + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); + assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); + assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); + } + } + + /** + * Broker, virtualhost and connection statistics. + */ + public void testGenerateBrokerVirtualhostAndConnectionStatistics() throws Exception + { + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); + assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); + } + + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); + assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); + assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java new file mode 100644 index 0000000000..e657856d0e --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; + +/** + * Test statistics for delivery and receipt. + */ +public class MessageStatisticsDeliveryTest extends MessageStatisticsTestCase +{ + public void configureStatistics() throws Exception + { + setConfigurationProperty("statistics.generation.broker", "true"); + setConfigurationProperty("statistics.generation.virtualhosts", "true"); + setConfigurationProperty("statistics.generation.connections", "true"); + } + + public void testDeliveryAndReceiptStatistics() throws Exception + { + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + List<String> addresses = new ArrayList<String>(); + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered()); + assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered()); + assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived()); + + addresses.add(mc.getRemoteAddress()); + } + + assertEquals("Incorrect vhost delivery total", 0, vhost.getTotalMessagesDelivered()); + assertEquals("Incorrect vhost delivery data", 0, vhost.getTotalDataDelivered()); + assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived()); + + Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); + test.start(); + receiveUsing(test, 5); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + if (addresses.contains(mc.getRemoteAddress())) + { + assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered()); + assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered()); + assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived()); + } + else + { + assertEquals("Incorrect connection delivery total", 5, mc.getTotalMessagesDelivered()); + assertEquals("Incorrect connection delivery data", 1000, mc.getTotalDataDelivered()); + assertEquals("Incorrect connection receipt total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection receipt data", 0, mc.getTotalDataReceived()); + } + } + assertEquals("Incorrect vhost delivery total", 5, vhost.getTotalMessagesDelivered()); + assertEquals("Incorrect vhost delivery data", 1000, vhost.getTotalDataDelivered()); + assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived()); + + test.close(); + } + + protected void receiveUsing(Connection con, int number) throws Exception + { + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + createQueue(session); + MessageConsumer consumer = session.createConsumer(_queue); + for (int i = 0; i < number; i++) + { + Message msg = consumer.receive(100); + assertNotNull("Message " + i + " was not received", msg); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java new file mode 100644 index 0000000000..180440c0d6 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import java.util.List; + +import org.apache.qpid.util.LogMonitor; + +/** + * Test generation of message statistics reporting. + */ +public class MessageStatisticsReportingTest extends MessageStatisticsTestCase +{ + protected LogMonitor _monitor; + + public void configureStatistics() throws Exception + { + setConfigurationProperty("statistics.generation.broker", "true"); + setConfigurationProperty("statistics.generation.virtualhosts", "true"); + + if (getName().equals("testEnabledStatisticsReporting")) + { + setConfigurationProperty("statistics.reporting.period", "10"); + } + + _monitor = new LogMonitor(_outputFile); + } + + /** + * Test enabling reporting. + */ + public void testEnabledStatisticsReporting() throws Exception + { + sendUsing(_test, 10, 100); + sendUsing(_dev, 20, 100); + sendUsing(_local, 15, 100); + + Thread.sleep(10 * 1000); // 15s + + List<String> brokerStatsData = _monitor.findMatches("BRK-1008"); + List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009"); + List<String> vhostStatsData = _monitor.findMatches("VHT-1003"); + List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004"); + + assertEquals("Incorrect number of broker data stats log messages", 2, brokerStatsData.size()); + assertEquals("Incorrect number of broker message stats log messages", 2, brokerStatsMessages.size()); + assertEquals("Incorrect number of virtualhost data stats log messages", 6, vhostStatsData.size()); + assertEquals("Incorrect number of virtualhost message stats log messages", 6, vhostStatsMessages.size()); + } + + /** + * Test not enabling reporting. + */ + public void testNotEnabledStatisticsReporting() throws Exception + { + sendUsing(_test, 10, 100); + sendUsing(_dev, 20, 100); + sendUsing(_local, 15, 100); + + Thread.sleep(10 * 1000); // 15s + + List<String> brokerStatsData = _monitor.findMatches("BRK-1008"); + List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009"); + List<String> vhostStatsData = _monitor.findMatches("VHT-1003"); + List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004"); + + assertEquals("Incorrect number of broker data stats log messages", 0, brokerStatsData.size()); + assertEquals("Incorrect number of broker message stats log messages", 0, brokerStatsMessages.size()); + assertEquals("Incorrect number of virtualhost data stats log messages", 0, vhostStatsData.size()); + assertEquals("Incorrect number of virtualhost message stats log messages", 0, vhostStatsMessages.size()); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java new file mode 100644 index 0000000000..50ca51b18a --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java @@ -0,0 +1,233 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import javax.jms.Connection; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; + +/** + * Test generation of message statistics. + */ +public class MessageStatisticsTest extends MessageStatisticsTestCase +{ + public void configureStatistics() throws Exception + { + setConfigurationProperty("statistics.generation.broker", "true"); + setConfigurationProperty("statistics.generation.virtualhosts", "true"); + setConfigurationProperty("statistics.generation.connections", "true"); + } + + /** + * Test message totals. + */ + public void testMessageTotals() throws Exception + { + sendUsing(_test, 10, 100); + sendUsing(_dev, 20, 100); + sendUsing(_local, 5, 100); + sendUsing(_local, 5, 100); + sendUsing(_local, 5, 100); + Thread.sleep(2000); + + ManagedBroker test = _jmxUtils.getManagedBroker("test"); + ManagedBroker dev = _jmxUtils.getManagedBroker("development"); + ManagedBroker local = _jmxUtils.getManagedBroker("localhost"); + + if (!isBroker010()) + { + long total = 0; + long data = 0; + for (ManagedConnection mc : _jmxUtils.getAllManagedConnections()) + { + total += mc.getTotalMessagesReceived(); + data += mc.getTotalDataReceived(); + } + assertEquals("Incorrect connection total", 45, total); + assertEquals("Incorrect connection data", 4500, data); + } + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total", 45, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server data", 4500, _jmxUtils.getServerInformation().getTotalDataReceived()); + } + + if (!isBroker010()) + { + long testTotal = 0; + long testData = 0; + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + testTotal += mc.getTotalMessagesReceived(); + testData += mc.getTotalDataReceived(); + } + assertEquals("Incorrect test connection total", 10, testTotal); + assertEquals("Incorrect test connection data", 1000, testData); + } + assertEquals("Incorrect test vhost total", 10, test.getTotalMessagesReceived()); + assertEquals("Incorrect test vhost data", 1000, test.getTotalDataReceived()); + + if (!isBroker010()) + { + long devTotal = 0; + long devData = 0; + for (ManagedConnection mc : _jmxUtils.getManagedConnections("development")) + { + devTotal += mc.getTotalMessagesReceived(); + devData += mc.getTotalDataReceived(); + } + assertEquals("Incorrect test connection total", 20, devTotal); + assertEquals("Incorrect test connection data", 2000, devData); + } + assertEquals("Incorrect development total", 20, dev.getTotalMessagesReceived()); + assertEquals("Incorrect development data", 2000, dev.getTotalDataReceived()); + + if (!isBroker010()) + { + long localTotal = 0; + long localData = 0; + for (ManagedConnection mc : _jmxUtils.getManagedConnections("localhost")) + { + localTotal += mc.getTotalMessagesReceived(); + localData += mc.getTotalDataReceived(); + } + assertEquals("Incorrect test connection total", 15, localTotal); + assertEquals("Incorrect test connection data", 1500, localData); + } + assertEquals("Incorrect localhost total", 15, local.getTotalMessagesReceived()); + assertEquals("Incorrect localhost data", 1500, local.getTotalDataReceived()); + } + + /** + * Test message totals when a connection is closed. + */ + public void testMessageTotalsWithClosedConnections() throws Exception + { + Connection temp = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); + temp.start(); + + sendUsing(_test, 10, 100); + sendUsing(temp, 10, 100); + sendUsing(_test, 10, 100); + Thread.sleep(2000); + + temp.close(); + + ManagedBroker test = _jmxUtils.getManagedBroker("test"); + + if (!isBroker010()) + { + long total = 0; + long data = 0; + for (ManagedConnection mc : _jmxUtils.getAllManagedConnections()) + { + total += mc.getTotalMessagesReceived(); + data += mc.getTotalDataReceived(); + } + assertEquals("Incorrect active connection total", 20, total); + assertEquals("Incorrect active connection data", 2000, data); + } + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total", 30, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server data", 3000, _jmxUtils.getServerInformation().getTotalDataReceived()); + } + + if (!isBroker010()) + { + long testTotal = 0; + long testData = 0; + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + testTotal += mc.getTotalMessagesReceived(); + testData += mc.getTotalDataReceived(); + } + assertEquals("Incorrect test active connection total", 20, testTotal); + assertEquals("Incorrect test active connection data", 20 * 100, testData); + } + assertEquals("Incorrect test vhost total", 30, test.getTotalMessagesReceived()); + assertEquals("Incorrect test vhost data", 30 * 100, test.getTotalDataReceived()); + } + + /** + * Test message peak rate generation. + */ + public void testMessagePeakRates() throws Exception + { + sendUsing(_test, 2, 10); + Thread.sleep(10000); + sendUsing(_dev, 4, 10); + Thread.sleep(10000); + + ManagedBroker test = _jmxUtils.getManagedBroker("test"); + ManagedBroker dev = _jmxUtils.getManagedBroker("development"); + + assertApprox("Incorrect test vhost peak messages", 0.2d, 1.0d, test.getPeakMessageReceiptRate()); + assertApprox("Incorrect test vhost peak data", 0.2d, 10.0d, test.getPeakDataReceiptRate()); + assertApprox("Incorrect dev vhost peak messages", 0.2d, 2.0d, dev.getPeakMessageReceiptRate()); + assertApprox("Incorrect dev vhost peak data", 0.2d, 20.0d, dev.getPeakDataReceiptRate()); + + if (!_broker.equals(VM)) + { + assertApprox("Incorrect server peak messages", 0.2d, 2.0d, _jmxUtils.getServerInformation().getPeakMessageReceiptRate()); + assertApprox("Incorrect server peak data", 0.2d, 20.0d, _jmxUtils.getServerInformation().getPeakDataReceiptRate()); + } + } + + /** + * Test message totals when a vhost has its statistics reset + */ + public void testMessageTotalVhostReset() throws Exception + { + sendUsing(_test, 10, 10); + sendUsing(_dev, 10, 10); + Thread.sleep(2000); + + ManagedBroker test = _jmxUtils.getManagedBroker("test"); + ManagedBroker dev = _jmxUtils.getManagedBroker("development"); + + assertEquals("Incorrect test vhost total messages", 10, test.getTotalMessagesReceived()); + assertEquals("Incorrect test vhost total data", 100, test.getTotalDataReceived()); + assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived()); + assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived()); + } + + test.resetStatistics(); + + assertEquals("Incorrect test vhost total messages", 0, test.getTotalMessagesReceived()); + assertEquals("Incorrect test vhost total data", 0, test.getTotalDataReceived()); + assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived()); + assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived()); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java new file mode 100644 index 0000000000..a5b3aa283c --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Test generation of message statistics. + */ +public abstract class MessageStatisticsTestCase extends QpidBrokerTestCase +{ + protected static final String USER = "admin"; + + protected JMXTestUtils _jmxUtils; + protected Connection _test, _dev, _local; + protected String _queueName = "statistics"; + protected Destination _queue; + protected String _brokerUrl; + + @Override + public void setUp() throws Exception + { + _jmxUtils = new JMXTestUtils(this, USER, USER); + _jmxUtils.setUp(); + + configureStatistics(); + + super.setUp(); + + _brokerUrl = getBroker().toString(); + _test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); + _dev = new AMQConnection(_brokerUrl, USER, USER, "clientid", "development"); + _local = new AMQConnection(_brokerUrl, USER, USER, "clientid", "localhost"); + + _test.start(); + _dev.start(); + _local.start(); + + _jmxUtils.open(); + } + + protected void createQueue(Session session) throws AMQException, JMSException + { + _queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName); + if (!((AMQSession<?,?>) session).isQueueBound((AMQDestination) _queue)) + { + ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, null); + ((AMQSession<?,?>) session).declareAndBind((AMQDestination) new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName)); + } + } + + + @Override + public void tearDown() throws Exception + { + _jmxUtils.close(); + + _test.close(); + _dev.close(); + _local.close(); + + super.tearDown(); + } + + /** + * Configure statistics generation properties on the broker. + */ + public abstract void configureStatistics() throws Exception; + + protected void sendUsing(Connection con, int number, int size) throws Exception + { + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + createQueue(session); + MessageProducer producer = session.createProducer(_queue); + String content = new String(new byte[size]); + TextMessage msg = session.createTextMessage(content); + for (int i = 0; i < number; i++) + { + producer.send(msg); + } + } + + /** + * Asserts that the actual value is within the expected value plus or + * minus the given error. + */ + public void assertApprox(String message, double error, double expected, double actual) + { + double min = expected * (1.0d - error); + double max = expected * (1.0d + error); + String assertion = String.format("%s: expected %f +/- %d%%, actual %f", + message, expected, (int) (error * 100.0d), actual); + assertTrue(assertion, actual > min && actual < max); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 51589c705f..0480ea4cab 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -31,13 +31,20 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueReceiver; +import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; import javax.naming.Context; import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; @@ -796,4 +803,190 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { } } + + public void testQueueReceiversAndTopicSubscriber() throws Exception + { + Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}"); + Topic topic = new AMQAnyDestination("ADDR:amq.topic/test"); + + QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + QueueReceiver receiver = qSession.createReceiver(queue); + + TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber sub = tSession.createSubscriber(topic); + + Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue")); + prod1.send(ssn.createTextMessage("test1")); + + MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test")); + prod2.send(ssn.createTextMessage("test2")); + + Message msg1 = receiver.receive(); + assertNotNull(msg1); + assertEquals("test1",((TextMessage)msg1).getText()); + + Message msg2 = sub.receive(); + assertNotNull(msg2); + assertEquals("test2",((TextMessage)msg2).getText()); + } + + public void testDurableSubscriber() throws Exception + { + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Topic topic = ssn.createTopic("news.us"); + + MessageConsumer cons = ssn.createDurableSubscriber(topic, "my-sub"); + MessageProducer prod = ssn.createProducer(topic); + + Message m = ssn.createTextMessage("A"); + prod.send(m); + Message msg = cons.receive(1000); + assertNotNull(msg); + assertEquals("A",((TextMessage)msg).getText()); + } + + public void testDeleteOptions() throws Exception + { + Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + MessageConsumer cons; + + // default (create never, assert never) ------------------- + // create never -------------------------------------------- + String addr1 = "ADDR:testQueue1;{create: always, delete: always}"; + AMQDestination dest = new AMQAnyDestination(addr1); + try + { + cons = jmsSession.createConsumer(dest); + cons.close(); + } + catch(JMSException e) + { + fail("Exception should not be thrown. Exception thrown is : " + e); + } + + assertFalse("Queue not deleted as expected",( + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + + + String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; + dest = new AMQAnyDestination(addr2); + try + { + cons = jmsSession.createConsumer(dest); + cons.close(); + } + catch(JMSException e) + { + fail("Exception should not be thrown. Exception thrown is : " + e); + } + + assertFalse("Queue not deleted as expected",( + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + + + String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; + dest = new AMQAnyDestination(addr3); + try + { + cons = jmsSession.createConsumer(dest); + MessageProducer prod = jmsSession.createProducer(dest); + prod.close(); + } + catch(JMSException e) + { + fail("Exception should not be thrown. Exception thrown is : " + e); + } + + assertFalse("Queue not deleted as expected",( + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + + + } + + /** + * Test Goals : 1. Test if the client sets the correct accept mode for unreliable + * and at-least-once. + * 2. Test default reliability modes for Queues and Topics. + * 3. Test if an exception is thrown if exactly-once is used. + * 4. Test if an exception is thrown if at-least-once is used with topics. + * + * Test Strategy: For goal #1 & #2 + * For unreliable and at-least-once the test tries to receives messages + * in client_ack mode but does not ack the messages. + * It will then close the session, recreate a new session + * and will then try to verify the queue depth. + * For unreliable the messages should have been taken off the queue. + * For at-least-once the messages should be put back onto the queue. + * + */ + + public void testReliabilityOptions() throws Exception + { + String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}"; + acceptModeTest(addr1,0); + + String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}"; + acceptModeTest(addr2,2); + + // Default accept-mode for topics + acceptModeTest("ADDR:amq.topic/test",0); + + // Default accept-mode for queues + acceptModeTest("ADDR:testQueue1;{create: always}",2); + + String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; + try + { + AMQAnyDestination dest = new AMQAnyDestination(addr3); + fail("An exception should be thrown indicating it's an unsupported type"); + } + catch(Exception e) + { + assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported")); + } + + String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}"; + try + { + AMQAnyDestination dest = new AMQAnyDestination(addr4); + Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); + MessageConsumer cons = ssn.createConsumer(dest); + fail("An exception should be thrown indicating it's an unsupported combination"); + } + catch(Exception e) + { + assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics")); + } + } + + private void acceptModeTest(String address, int expectedQueueDepth) throws Exception + { + Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); + MessageConsumer cons; + MessageProducer prod; + + AMQDestination dest = new AMQAnyDestination(address); + cons = ssn.createConsumer(dest); + prod = ssn.createProducer(dest); + + for (int i=0; i < expectedQueueDepth; i++) + { + prod.send(ssn.createTextMessage("Msg" + i)); + } + + for (int i=0; i < expectedQueueDepth; i++) + { + Message msg = cons.receive(1000); + assertNotNull(msg); + assertEquals("Msg" + i,((TextMessage)msg).getText()); + } + + ssn.close(); + ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); + long queueDepth = ((AMQSession) ssn).getQueueDepth(dest); + assertEquals(expectedQueueDepth,queueDepth); + cons.close(); + prod.close(); + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java index 33575b58aa..8577fb5b6a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.test.unit.client; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; @@ -32,11 +34,9 @@ import javax.jms.Session; * * Test to validate that setting the respective qpid.declare_queues, * qpid.declare_exchanges system properties functions as expected. - * */ public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase { - public void testQueueDeclare() throws Exception { setSystemProperty("qpid.declare_queues", "false"); @@ -53,11 +53,8 @@ public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase fail("JMSException should be thrown as the queue does not exist"); } catch (JMSException e) - { - assertTrue("Exception should be that the queue does not exist :" + - e.getMessage(), - e.getMessage().contains("does not exist")); - + { + checkExceptionErrorCode(e, AMQConstant.NOT_FOUND); } } @@ -79,10 +76,15 @@ public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase } catch (JMSException e) { - assertTrue("Exception should be that the exchange does not exist :" + - e.getMessage(), - e.getMessage().contains("Exchange " + EXCHANGE_TYPE + " does not exist")); + checkExceptionErrorCode(e, AMQConstant.NOT_FOUND); } } + private void checkExceptionErrorCode(JMSException original, AMQConstant code) + { + Exception linked = original.getLinkedException(); + assertNotNull("Linked exception should have been set", linked); + assertTrue("Linked exception should be an AMQException", linked instanceof AMQException); + assertEquals("Error code should be " + code.getCode(), code, ((AMQException) linked).getErrorCode()); + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index f0794c9dab..b6232b1734 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -29,7 +29,6 @@ import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.framing.*; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,70 +48,67 @@ public class ChannelCloseTest extends QpidBrokerTestCase implements ExceptionLis private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseTest.class); Connection _connection; - private String _brokerlist = "vm://:1"; private Session _session; private static final long SYNC_TIMEOUT = 500; private int TEST = 0; - /* - close channel, use chanel with same id ensure error. + /** + * Close channel, use chanel with same id ensure error. + * + * This test is only valid for non 0-10 connection . */ public void testReusingChannelAfterFullClosure() throws Exception { - // this is testing an inVM Connetion conneciton - if (isJavaBroker() && !isExternalBroker()) + _connection=newConnection(); + + // Create Producer + try { - _connection=newConnection(); + _connection.start(); + + createChannelAndTest(1); - // Create Producer + // Cause it to close try { - _connection.start(); - - createChannelAndTest(1); - - // Cause it to close - try - { - _logger.info("Testing invalid exchange"); - declareExchange(1, "", "name_that_will_lookup_to_null", false); - fail("Exchange name is empty so this should fail "); - } - catch (AMQException e) - { - assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode()); - } + _logger.info("Testing invalid exchange"); + declareExchange(1, "", "name_that_will_lookup_to_null", false); + fail("Exchange name is empty so this should fail "); + } + catch (AMQException e) + { + assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode()); + } - // Check that - try + // Check that + try + { + _logger.info("Testing valid exchange should fail"); + declareExchange(1, "topic", "amq.topic", false); + fail("This should not succeed as the channel should be closed "); + } + catch (AMQException e) + { + if (_logger.isInfoEnabled()) { - _logger.info("Testing valid exchange should fail"); - declareExchange(1, "topic", "amq.topic", false); - fail("This should not succeed as the channel should be closed "); + _logger.info("Exception occured was:" + e.getErrorCode()); } - catch (AMQException e) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Exception occured was:" + e.getErrorCode()); - } - assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); + assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); - _connection=newConnection(); - } + _connection=newConnection(); + } - checkSendingMessage(); + checkSendingMessage(); - _session.close(); - _connection.close(); + _session.close(); + _connection.close(); - } - catch (JMSException e) - { - e.printStackTrace(); - fail(e.getMessage()); - } + } + catch (JMSException e) + { + e.printStackTrace(); + fail(e.getMessage()); } } @@ -306,27 +302,19 @@ public class ChannelCloseTest extends QpidBrokerTestCase implements ExceptionLis private Connection newConnection() { - AMQConnection connection = null; + Connection connection = null; try { - connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'"); + connection = getConnection(); - connection.setConnectionListener(this); + ((AMQConnection) connection).setConnectionListener(this); _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); connection.start(); } - catch (JMSException e) - { - fail("Creating new connection when:" + e.getMessage()); - } - catch (AMQException e) - { - fail("Creating new connection when:" + e.getMessage()); - } - catch (URLSyntaxException e) + catch (Exception e) { fail("Creating new connection when:" + e.getMessage()); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java new file mode 100644 index 0000000000..36bac3b715 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +/** + * This verifies that changing the {@code transactionTimeout} configuration will alter + * the behaviour of the transaction open and idle logging, and that when the connection + * will be closed. + */ +public class TransactionTimeoutConfigurationTest extends TransactionTimeoutTestCase +{ + @Override + protected void configure() throws Exception + { + // Setup housekeeping every second + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); + + // Set transaction timout properties. + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "200"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "1000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "100"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "500"); + } + + public void testProducerIdleCommit() throws Exception + { + try + { + send(5, 0); + + sleep(2.0f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(5, 0); + + check(IDLE); + } + + public void testProducerOpenCommit() throws Exception + { + try + { + send(5, 0.3f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(6, 3); + + check(OPEN); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java new file mode 100644 index 0000000000..71b89bf911 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +/** + * This verifies that the default behaviour is not to time out transactions. + */ +public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase +{ + @Override + protected void configure() throws Exception + { + // Setup housekeeping every second + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); + } + + public void testProducerIdleCommit() throws Exception + { + try + { + send(5, 0); + + sleep(2.0f); + + _psession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testProducerOpenCommit() throws Exception + { + try + { + send(5, 0.3f); + + _psession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java new file mode 100644 index 0000000000..c912d6a323 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -0,0 +1,335 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +/** + * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration + * is set for a virtual host. + * + * A producer that is idle for too long or open for too long will have its connection closed and + * any further operations will fail with a 408 resource timeout exception. Consumers will not + * be affected by the transaction timeout configuration. + */ +public class TransactionTimeoutTest extends TransactionTimeoutTestCase +{ + public void testProducerIdle() throws Exception + { + try + { + sleep(2.0f); + + _psession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testProducerIdleCommit() throws Exception + { + try + { + send(5, 0); + + sleep(2.0f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(5, 0); + + check(IDLE); + } + + public void testProducerOpenCommit() throws Exception + { + try + { + send(6, 0.5f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(0, 10); + + check(OPEN); + } + + public void testProducerIdleCommitTwice() throws Exception + { + try + { + send(5, 0); + + sleep(1.0f); + + _psession.commit(); + + send(5, 0); + + sleep(2.0f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(10, 0); + + check(IDLE); + } + + public void testProducerOpenCommitTwice() throws Exception + { + try + { + send(5, 0); + + sleep(1.0f); + + _psession.commit(); + + send(6, 0.5f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + // the presistent store generates more idle messages? + monitor(isBrokerStorePersistent() ? 10 : 5, 10); + + check(OPEN); + } + + public void testProducerIdleRollback() throws Exception + { + try + { + send(5, 0); + + sleep(2.0f); + + _psession.rollback(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(5, 0); + + check(IDLE); + } + + public void testProducerIdleRollbackTwice() throws Exception + { + try + { + send(5, 0); + + sleep(1.0f); + + _psession.rollback(); + + send(5, 0); + + sleep(2.0f); + + _psession.rollback(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(10, 0); + + check(IDLE); + } + + public void testConsumerCommitClose() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + _csession.commit(); + + sleep(3.0f); + + _csession.close(); + } + catch (Exception e) + { + fail("should have succeeded: " + e.getMessage()); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testConsumerIdleReceiveCommit() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + sleep(2.0f); + + expect(1, 0); + + sleep(2.0f); + + _csession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testConsumerIdleCommit() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + sleep(2.0f); + + _csession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testConsumerIdleRollback() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + sleep(2.0f); + + _csession.rollback(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testConsumerOpenCommit() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + sleep(3.0f); + + _csession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testConsumerOpenRollback() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + sleep(3.0f); + + _csession.rollback(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java new file mode 100644 index 0000000000..637f43fb2c --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java @@ -0,0 +1,253 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.Session; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.LogMonitor; + +/** + * The {@link TestCase} for transaction timeout testing. + */ +public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements ExceptionListener +{ + public static final String VIRTUALHOST = "test"; + public static final String TEXT = "0123456789abcdefghiforgettherest"; + public static final String CHN_OPEN_TXN = "CHN-1007"; + public static final String CHN_IDLE_TXN = "CHN-1008"; + public static final String IDLE = "Idle"; + public static final String OPEN = "Open"; + + protected LogMonitor _monitor; + protected AMQConnection _con; + protected Session _psession, _csession; + protected Queue _queue; + protected MessageConsumer _consumer; + protected MessageProducer _producer; + protected CountDownLatch _caught = new CountDownLatch(1); + protected String _message; + protected Exception _exception; + protected AMQConstant _code; + + protected void configure() throws Exception + { + // Setup housekeeping every second + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); + + /* + * Set transaction timout properties. The XML in the virtualhosts configuration is as follows: + * + * <transactionTimeout> + * <openWarn>1000</openWarn> + * <openClose>2000</openClose> + * <idleWarn>500</idleWarn> + * <idleClose>1500</idleClose> + * </transactionTimeout> + */ + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "1000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "2000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "500"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "1000"); + } + + protected void setUp() throws Exception + { + // Configure timeouts + configure(); + + // Monitor log file + _monitor = new LogMonitor(_outputFile); + + // Start broker + super.setUp(); + + // Connect to broker + String broker = _broker.equals(VM) ? ("vm://:" + DEFAULT_VM_PORT) : ("tcp://localhost:" + DEFAULT_PORT); + ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&maxprefetch='1'"); + _con = (AMQConnection) getConnection(url); + _con.setExceptionListener(this); + _con.start(); + + // Create queue + Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED); + AMQShortString queueName = new AMQShortString("test"); + _queue = new AMQQueue(qsession.getDefaultQueueExchangeName(), queueName, queueName, false, true); + qsession.close(); + + // Create producer and consumer + producer(); + consumer(); + } + + protected void tearDown() throws Exception + { + try + { + _con.close(); + } + finally + { + super.tearDown(); + } + } + + /** + * Create a transacted persistent message producer session. + */ + protected void producer() throws Exception + { + _psession = _con.createSession(true, Session.SESSION_TRANSACTED); + _producer = _psession.createProducer(_queue); + _producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } + + /** + * Create a transacted message consumer session. + */ + protected void consumer() throws Exception + { + _csession = _con.createSession(true, Session.SESSION_TRANSACTED); + _consumer = _csession.createConsumer(_queue); + } + + /** + * Send a number of messages to the queue, optionally pausing after each. + */ + protected void send(int count, float delay) throws Exception + { + for (int i = 0; i < count; i++) + { + sleep(delay); + Message msg = _psession.createTextMessage(TEXT); + msg.setIntProperty("i", i); + _producer.send(msg); + } + } + + /** + * Sleep for a number of seconds. + */ + protected void sleep(float seconds) throws Exception + { + try + { + Thread.sleep((long) (seconds * 1000.0f)); + } + catch (InterruptedException ie) + { + throw new RuntimeException("Interrupted"); + } + } + + /** + * Check for idle and open messages. + * + * Either exactly zero messages, or +-2 error accepted around the specified number. + */ + protected void monitor(int idle, int open) throws Exception + { + List<String> idleMsgs = _monitor.findMatches(CHN_IDLE_TXN); + List<String> openMsgs = _monitor.findMatches(CHN_OPEN_TXN); + + String idleErr = "Expected " + idle + " but found " + idleMsgs.size() + " txn idle messages"; + String openErr = "Expected " + open + " but found " + openMsgs.size() + " txn open messages"; + + if (idle == 0) + { + assertTrue(idleErr, idleMsgs.isEmpty()); + } + else + { + assertTrue(idleErr, idleMsgs.size() >= idle - 2 && idleMsgs.size() <= idle + 2); + } + + if (open == 0) + { + assertTrue(openErr, openMsgs.isEmpty()); + } + else + { + assertTrue(openErr, openMsgs.size() >= open - 2 && openMsgs.size() <= open + 2); + } + } + + /** + * Receive a number of messages, optionally pausing after each. + */ + protected void expect(int count, float delay) throws Exception + { + for (int i = 0; i < count; i++) + { + sleep(delay); + Message msg = _consumer.receive(1000); + assertNotNull("Message should not be null", msg); + assertTrue("Message should be a text message", msg instanceof TextMessage); + assertEquals("Message content does not match expected", TEXT, ((TextMessage) msg).getText()); + assertEquals("Message order is incorrect", i, msg.getIntProperty("i")); + } + } + + /** + * Checks that the correct exception was thrown and was received + * by the listener with a 506 error code. + */ + protected void check(String reason)throws InterruptedException + { + assertTrue("Should have caught exception in listener", _caught.await(1, TimeUnit.SECONDS)); + assertNotNull("Should have thrown exception to client", _exception); + assertTrue("Exception message should contain '" + reason + "': " + _message, _message.contains(reason + " transaction timed out")); + assertNotNull("Exception should have an error code", _code); + assertEquals("Error code should be 506", AMQConstant.RESOURCE_ERROR, _code); + } + + /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */ + public void onException(JMSException jmse) + { + _caught.countDown(); + _message = jmse.getLinkedException().getMessage(); + if (jmse.getLinkedException() instanceof AMQException) + { + _code = ((AMQException) jmse.getLinkedException()).getErrorCode(); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java index ff80c91fac..9f6963643a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java @@ -21,6 +21,8 @@ package org.apache.qpid.test.utils; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import javax.management.JMException; @@ -31,14 +33,18 @@ import javax.management.ObjectName; import javax.management.MalformedObjectNameException; import javax.management.remote.JMXConnector; +import junit.framework.TestCase; + import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.commands.objects.AllObjects; import org.apache.qpid.management.common.JMXConnnectionFactory; import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; import org.apache.qpid.management.common.mbeans.ManagedExchange; import org.apache.qpid.management.common.mbeans.LoggingManagement; import org.apache.qpid.management.common.mbeans.ConfigurationManagement; import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.management.common.mbeans.ServerInformation; import org.apache.qpid.management.common.mbeans.UserManagement; /** @@ -232,7 +238,8 @@ public class JMXTestUtils { // Get the name of the test manager AllObjects allObject = new AllObjects(_mbsc); - allObject.querystring = "org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=" + vhostName + ",*"; + allObject.querystring = "org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=" + + ObjectName.quote(vhostName) + ",*"; Set<ObjectName> objectNames = allObject.returnObjects(); @@ -259,7 +266,9 @@ public class JMXTestUtils { // Get the name of the test manager AllObjects allObject = new AllObjects(_mbsc); - allObject.querystring = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost=" + virtualHostName + ",name=" + queue + ",*"; + allObject.querystring = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost=" + + ObjectName.quote(virtualHostName) + ",name=" + + ObjectName.quote(queue) + ",*"; Set<ObjectName> objectNames = allObject.returnObjects(); @@ -287,7 +296,9 @@ public class JMXTestUtils { // Get the name of the test manager AllObjects allObject = new AllObjects(_mbsc); - allObject.querystring = "org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=" + virtualHostName + ",name=" + exchange + ",*"; + allObject.querystring = "org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=" + + ObjectName.quote(virtualHostName) + ",name=" + + ObjectName.quote(exchange) + ",*"; Set<ObjectName> objectNames = allObject.returnObjects(); @@ -321,6 +332,16 @@ public class JMXTestUtils return MBeanServerInvocationHandler.newProxyInstance(_mbsc, objectName, managedClass, false); } + public <T> List<T> getManagedObjectList(Class<T> managedClass, Set<ObjectName> objectNames) + { + List<T> objects = new ArrayList<T>(); + for (ObjectName name : objectNames) + { + objects.add(getManagedObject(managedClass, name)); + } + return objects; + } + public ManagedBroker getManagedBroker(String virtualHost) { return getManagedObject(ManagedBroker.class, getVirtualHostManagerObjectName(virtualHost)); @@ -355,4 +376,54 @@ public class JMXTestUtils ObjectName objectName = new ObjectName("org.apache.qpid:type=UserManagement,name=UserManagement"); return getManagedObject(UserManagement.class, objectName); } + + /** + * Retrive {@link ServerInformation} JMX MBean. + */ + public ServerInformation getServerInformation() + { + // Get the name of the test manager + AllObjects allObject = new AllObjects(_mbsc); + allObject.querystring = "org.apache.qpid:type=ServerInformation,name=ServerInformation,*"; + + Set<ObjectName> objectNames = allObject.returnObjects(); + + TestCase.assertNotNull("Null ObjectName Set returned", objectNames); + TestCase.assertEquals("Incorrect number of objects returned", 1, objectNames.size()); + + // We have verified we have only one value in objectNames so return it + return getManagedObject(ServerInformation.class, objectNames.iterator().next()); + } + + /** + * Retrive all {@link ManagedConnection} objects. + */ + public List<ManagedConnection> getAllManagedConnections() + { + // Get the name of the test manager + AllObjects allObject = new AllObjects(_mbsc); + allObject.querystring = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=*,name=*"; + + Set<ObjectName> objectNames = allObject.returnObjects(); + + TestCase.assertNotNull("Null ObjectName Set returned", objectNames); + + return getManagedObjectList(ManagedConnection.class, objectNames); + } + + /** + * Retrive all {@link ManagedConnection} objects for a particular virtual host. + */ + public List<ManagedConnection> getManagedConnections(String vhost) + { + // Get the name of the test manager + AllObjects allObject = new AllObjects(_mbsc); + allObject.querystring = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=" + ObjectName.quote(vhost) + ",name=*"; + + Set<ObjectName> objectNames = allObject.returnObjects(); + + TestCase.assertNotNull("Null ObjectName Set returned", objectNames); + + return getManagedObjectList(ManagedConnection.class, objectNames); + } } diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index e89b09cca2..4127682208 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -165,6 +165,11 @@ org.apache.qpid.server.security.firewall.FirewallConfigTest#* org.apache.qpid.server.security.firewall.FirewallConfigurationTest#* org.apache.qpid.server.plugins.PluginTest#* +// Transacion timeouts not implemented in CPP broker +org.apache.qpid.test.unit.transacted.TransactionTimeoutDisabledTest#* +org.apache.qpid.test.unit.transacted.TransactionTimeoutConfigurationTest#* +org.apache.qpid.test.unit.transacted.TransactionTimeoutTest#* + // Java broker only org.apache.qpid.server.logging.management.LoggingManagementMBeanTest#* org.apache.qpid.server.management.AMQUserManagementMBeanTest#* diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index 3486d5c70c..c05aad0cb1 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -47,6 +47,7 @@ org.apache.qpid.server.logging.SubscriptionLoggingTest#testSubscriptionSuspend // 0-10 Broker does not have a JMX connection MBean org.apache.qpid.management.jmx.ManagementActorLoggingTest#testConnectionCloseViaManagement +org.apache.qpid.management.jmx.MessageConnectionStatisticsTest#* // 0-10 has different ideas about clientid and ownership of queues org.apache.qpid.server.queue.ModelTest#* @@ -54,9 +55,6 @@ org.apache.qpid.server.queue.ModelTest#* // 0-10 is not supported by the MethodRegistry org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#* -// QPID-2084 : this test needs more work for 0-10 -org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#* - //QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker) org.apache.qpid.server.queue.ProducerFlowControlTest#* @@ -74,6 +72,9 @@ org.apache.qpid.test.unit.publish.DirtyTransactedPublishTest#* org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage org.apache.qpid.test.unit.ack.RecoverTest#testRecoverInAutoAckListener +// This test uses 0-8 channel frames +org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* + //Temporarily adding the following until the issues are sorted out. //Should probably raise JIRAs for them. org.apache.qpid.transport.network.mina.MINANetworkDriverTest#* |