diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-11-09 16:35:55 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-11-09 16:35:55 +0000 |
| commit | fb1d3a98129784e73c3c2770be1a15d9f7f088ac (patch) | |
| tree | 45ae861974476ff722e9e2bcfb3f2091344bdeca | |
| parent | 084d4c440b40ebc8f35578522d0640aa4415935f (diff) | |
| download | qpid-python-fb1d3a98129784e73c3c2770be1a15d9f7f088ac.tar.gz | |
QPID-2932: Split statistics into received and delivered
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1033077 13f79535-47bb-0310-9956-ffa450edef68
18 files changed, 693 insertions, 261 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 8fa478120d..88e276046b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -39,7 +39,6 @@ package org.apache.qpid.server; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; @@ -62,9 +61,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQProtocolSessionMBean; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueueMBean; @@ -356,44 +352,67 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr public void resetStatistics() throws Exception { - _virtualHost.getMessageStatistics().reset(); - _virtualHost.getDataStatistics().reset(); - - Collection<AMQProtocolSession> connections = _virtualHost.getConnectionRegistry().getConnections(); - for (AMQProtocolSession con : connections) - { - ((AMQProtocolSessionMBean) ((AMQMinaProtocolSession) con).getManagedObject()).resetStatistics(); - } + _virtualHost.resetStatistics(); + } + + public double getPeakMessageDeliveryRate() + { + return _virtualHost.getMessageDeliveryStatistics().getPeak(); + } + + public double getPeakDataDeliveryRate() + { + return _virtualHost.getDataDeliveryStatistics().getPeak(); + } + + public double getMessageDeliveryRate() + { + return _virtualHost.getMessageDeliveryStatistics().getRate(); + } + + public double getDataDeliveryRate() + { + return _virtualHost.getDataDeliveryStatistics().getRate(); + } + + public long getTotalMessagesDelivered() + { + return _virtualHost.getMessageDeliveryStatistics().getTotal(); + } + + public long getTotalDataDelivered() + { + return _virtualHost.getDataDeliveryStatistics().getTotal(); } - public double getPeakMessageRate() + public double getPeakMessageReceiptRate() { - return _virtualHost.getMessageStatistics().getPeak(); + return _virtualHost.getMessageReceiptStatistics().getPeak(); } - public double getPeakDataRate() + public double getPeakDataReceiptRate() { - return _virtualHost.getDataStatistics().getPeak(); + return _virtualHost.getDataReceiptStatistics().getPeak(); } - public double getMessageRate() + public double getMessageReceiptRate() { - return _virtualHost.getMessageStatistics().getRate(); + return _virtualHost.getMessageReceiptStatistics().getRate(); } - public double getDataRate() + public double getDataReceiptRate() { - return _virtualHost.getDataStatistics().getRate(); + return _virtualHost.getDataReceiptStatistics().getRate(); } - public long getTotalMessages() + public long getTotalMessagesReceived() { - return _virtualHost.getMessageStatistics().getTotal(); + return _virtualHost.getMessageReceiptStatistics().getTotal(); } - public long getTotalData() + public long getTotalDataReceived() { - return _virtualHost.getDataStatistics().getTotal(); + return _virtualHost.getDataReceiptStatistics().getTotal(); } public boolean isStatisticsEnabled() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 116c425a99..1240adcae4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -208,7 +208,7 @@ public class AMQChannel { long bodySize = _currentMessage.getContentHeaderBody().bodySize; long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp(); - _session.registerMessageDelivery(bodySize, timestamp); + _session.registerMessageReceived(bodySize, timestamp); try { _currentMessage.deliverToQueues(); @@ -972,6 +972,7 @@ public class AMQChannel throws AMQException { getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag()); + _session.registerMessageDelivered(entry.getMessage().getSize()); } }; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java index ba626c872d..72bd88aaa5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java @@ -21,17 +21,14 @@ package org.apache.qpid.server.information.management; import java.io.IOException; -import java.util.Collection; import javax.management.JMException; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.management.common.mbeans.ServerInformation; import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; -import org.apache.qpid.server.AMQBrokerManagerMBean; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; /** MBean class for the ServerInformationMBean. */ @MBeanDescription("Server Information Interface") @@ -77,44 +74,67 @@ public class ServerInformationMBean extends AMQManagedObject implements ServerIn public void resetStatistics() throws Exception { - registry.getDataStatistics().reset(); - registry.getMessageStatistics().reset(); - - Collection<VirtualHost> virtualhosts = registry.getVirtualHostRegistry().getVirtualHosts(); - for (VirtualHost vhost : virtualhosts) - { - ((AMQBrokerManagerMBean) vhost.getBrokerMBean()).resetStatistics(); - } + registry.resetStatistics(); } - public double getPeakMessageRate() + public double getPeakMessageDeliveryRate() { - return registry.getMessageStatistics().getPeak(); + return registry.getMessageDeliveryStatistics().getPeak(); } - public double getPeakDataRate() + public double getPeakDataDeliveryRate() { - return registry.getDataStatistics().getPeak(); + return registry.getDataDeliveryStatistics().getPeak(); } - public double getMessageRate() + public double getMessageDeliveryRate() { - return registry.getMessageStatistics().getRate(); + return registry.getMessageDeliveryStatistics().getRate(); } - public double getDataRate() + public double getDataDeliveryRate() { - return registry.getDataStatistics().getRate(); + return registry.getDataDeliveryStatistics().getRate(); } - public long getTotalMessages() + public long getTotalMessagesDelivered() { - return registry.getMessageStatistics().getTotal(); + return registry.getMessageDeliveryStatistics().getTotal(); } - public long getTotalData() + public long getTotalDataDelivered() { - return registry.getDataStatistics().getTotal(); + return registry.getDataDeliveryStatistics().getTotal(); + } + + public double getPeakMessageReceiptRate() + { + return registry.getMessageReceiptStatistics().getPeak(); + } + + public double getPeakDataReceiptRate() + { + return registry.getDataReceiptStatistics().getPeak(); + } + + public double getMessageReceiptRate() + { + return registry.getMessageReceiptStatistics().getRate(); + } + + public double getDataReceiptRate() + { + return registry.getDataReceiptStatistics().getRate(); + } + + public long getTotalMessagesReceived() + { + return registry.getMessageReceiptStatistics().getTotal(); + } + + public long getTotalDataReceived() + { + return registry.getDataReceiptStatistics().getTotal(); } public boolean isStatisticsEnabled() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties index 636b7bff17..2f1612aa2f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties @@ -222,8 +222,8 @@ BRK_STOPPED = BRK-1005 : Stopped BRK_CONFIG = BRK-1006 : Using configuration : {0} # 0 - path BRK_LOG_CONFIG = BRK-1007 : Using logging configuration : {0} -BRK_STATS_DATA = BRK-1008 : {0,number,#.###} kB/s, {1,number,#} bytes -BRK_STATS_MSGS = BRK-1009 : {0,number,#.###} msg/s, {1,number,#} msgs +BRK_STATS_DATA = BRK-1008 : {0} : {1,number,#.###} kB/s, {2,number,#} bytes +BRK_STATS_MSGS = BRK-1009 : {0} : {1,number,#.###} msg/s, {2,number,#} msgs #ManagementConsole MNG_STARTUP = MNG-1001 : Startup @@ -245,8 +245,8 @@ MNG_CLOSE = MNG-1008 : Close # 0 - name VHT_CREATED = VHT-1001 : Created : {0} VHT_CLOSED = VHT-1002 : Closed -VHT_STATS_DATA = VHT-1003 : {0} : {1,number,#.###} kB/s, {2,number,#} bytes -VHT_STATS_MSGS = VHT-1004 : {0} : {1,number,#.###} msg/s, {2,number,#} msgs +VHT_STATS_DATA = VHT-1003 : {0} : {1} : {2,number,#.###} kB/s, {3,number,#} bytes +VHT_STATS_MSGS = VHT-1004 : {0} : {1} : {2,number,#.###} msg/s, {3,number,#} msgs #MessageStore # 0 - name diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 25d8761ec5..30138a400c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -154,7 +154,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable, St private ApplicationRegistry _registry; private boolean _statisticsEnabled = false; - private StatisticsCounter _messageStats, _dataStats; + private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; public ManagedObject getManagedObject() { @@ -954,30 +954,52 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable, St // No-op, interface munging between this and AMQProtocolSession } - public void registerMessageDelivery(long messageSize, long timestamp) + public void registerMessageDelivered(long messageSize) { if (isStatisticsEnabled()) { - _messageStats.registerEvent(1L, timestamp); - _dataStats.registerEvent(messageSize, timestamp); + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); } - _virtualHost.registerMessageDelivery(messageSize, timestamp); + _virtualHost.registerMessageDelivered(messageSize); + } + + public void registerMessageReceived(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); + } + _virtualHost.registerMessageReceived(messageSize, timestamp); + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; } - public StatisticsCounter getMessageStatistics() + public StatisticsCounter getMessageDeliveryStatistics() { - return _messageStats; + return _messagesDelivered; } - public StatisticsCounter getDataStatistics() + public StatisticsCounter getDataDeliveryStatistics() { - return _dataStats; + return _dataDelivered; } public void resetStatistics() { - _messageStats.reset(); - _dataStats.reset(); + _messagesDelivered.reset(); + _dataDelivered.reset(); + _messagesReceived.reset(); + _dataReceived.reset(); } public void initialiseStatistics() @@ -985,8 +1007,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable, St setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled()); - _messageStats = new StatisticsCounter("messages-" + getSessionID()); - _dataStats = new StatisticsCounter("bytes-" + getSessionID()); + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); + _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); + _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); + _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); } public boolean isStatisticsEnabled() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index ba110c4826..d2814eb01b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -221,5 +221,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession public ProtocolSessionIdentifier getSessionIdentifier(); - public void registerMessageDelivery(long messageSize, long timestamp); + public void registerMessageReceived(long messageSize, long timestamp); + + public void registerMessageDelivered(long messageSize); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 28f87fb585..ca07a04743 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -67,8 +67,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.ManagementActor; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; @@ -343,38 +341,67 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public void resetStatistics() throws Exception { - _session.getMessageStatistics().reset(); - _session.getDataStatistics().reset(); + _session.resetStatistics(); } - public double getPeakMessageRate() + public double getPeakMessageDeliveryRate() { - return _session.getMessageStatistics().getPeak(); + return _session.getMessageDeliveryStatistics().getPeak(); } - public double getPeakDataRate() + public double getPeakDataDeliveryRate() { - return _session.getDataStatistics().getPeak(); + return _session.getDataDeliveryStatistics().getPeak(); } - public double getMessageRate() + public double getMessageDeliveryRate() { - return _session.getMessageStatistics().getRate(); + return _session.getMessageDeliveryStatistics().getRate(); } - public double getDataRate() + public double getDataDeliveryRate() { - return _session.getDataStatistics().getRate(); + return _session.getDataDeliveryStatistics().getRate(); } - public long getTotalMessages() + public long getTotalMessagesDelivered() { - return _session.getMessageStatistics().getTotal(); + return _session.getMessageDeliveryStatistics().getTotal(); } - public long getTotalData() + public long getTotalDataDelivered() { - return _session.getDataStatistics().getTotal(); + return _session.getDataDeliveryStatistics().getTotal(); + } + + public double getPeakMessageReceiptRate() + { + return _session.getMessageReceiptStatistics().getPeak(); + } + + public double getPeakDataReceiptRate() + { + return _session.getDataReceiptStatistics().getPeak(); + } + + public double getMessageReceiptRate() + { + return _session.getMessageReceiptStatistics().getRate(); + } + + public double getDataReceiptRate() + { + return _session.getDataReceiptStatistics().getRate(); + } + + public long getTotalMessagesReceived() + { + return _session.getMessageReceiptStatistics().getTotal(); + } + + public long getTotalDataReceived() + { + return _session.getDataReceiptStatistics().getTotal(); } public boolean isStatisticsEnabled() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 93cc6b5d8b..aee9994eb3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -86,7 +86,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry, Stati protected Timer _reportingTimer; protected boolean _statisticsEnabled = false; - protected StatisticsCounter _messageStats, _dataStats; + protected StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; static { @@ -363,8 +363,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry, Stati if (broker) { - CurrentActor.get().message(BrokerMessages.BRK_STATS_DATA(_dataStats.getPeak() / 1024.0, _dataStats.getTotal())); - CurrentActor.get().message(BrokerMessages.BRK_STATS_MSGS(_messageStats.getPeak(), _messageStats.getTotal())); + CurrentActor.get().message(BrokerMessages.BRK_STATS_DATA("delivered", _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal())); + CurrentActor.get().message(BrokerMessages.BRK_STATS_MSGS("delivered", _messagesDelivered.getPeak(), _messagesDelivered.getTotal())); + CurrentActor.get().message(BrokerMessages.BRK_STATS_DATA("received", _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal())); + CurrentActor.get().message(BrokerMessages.BRK_STATS_MSGS("received", _messagesReceived.getPeak(), _messagesReceived.getTotal())); } if (virtualhost) @@ -372,11 +374,15 @@ public abstract class ApplicationRegistry implements IApplicationRegistry, Stati for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts()) { String name = vhost.getName(); - StatisticsCounter data = vhost.getDataStatistics(); - StatisticsCounter messages = vhost.getMessageStatistics(); + StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics(); + StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics(); + StatisticsCounter dataReceived = vhost.getDataReceiptStatistics(); + StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics(); - CurrentActor.get().message(VirtualHostMessages.VHT_STATS_DATA(name, data.getPeak() / 1024.0, data.getTotal())); - CurrentActor.get().message(VirtualHostMessages.VHT_STATS_MSGS(name, messages.getPeak(), messages.getTotal())); + CurrentActor.get().message(VirtualHostMessages.VHT_STATS_DATA(name, "delivered", dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal())); + CurrentActor.get().message(VirtualHostMessages.VHT_STATS_MSGS(name, "delivered", messagesDelivered.getPeak(), messagesDelivered.getTotal())); + CurrentActor.get().message(VirtualHostMessages.VHT_STATS_DATA(name, "received", dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); + CurrentActor.get().message(VirtualHostMessages.VHT_STATS_MSGS(name, "received", messagesReceived.getPeak(), messagesReceived.getTotal())); } } @@ -395,29 +401,50 @@ public abstract class ApplicationRegistry implements IApplicationRegistry, Stati } } - public void registerMessageDelivery(long messageSize, long timestamp) + public void registerMessageDelivered(long messageSize) { if (isStatisticsEnabled()) { - _messageStats.registerEvent(1L, timestamp); - _dataStats.registerEvent(messageSize, timestamp); + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); } } - public StatisticsCounter getMessageStatistics() + public void registerMessageReceived(long messageSize, long timestamp) { - return _messageStats; + if (isStatisticsEnabled()) + { + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); + } + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; } - public StatisticsCounter getDataStatistics() + public StatisticsCounter getDataDeliveryStatistics() { - return _dataStats; + return _dataDelivered; } public void resetStatistics() { - _messageStats.reset(); - _dataStats.reset(); + _messagesDelivered.reset(); + _dataDelivered.reset(); + _messagesReceived.reset(); + _dataReceived.reset(); for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts()) { @@ -430,8 +457,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry, Stati setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && getConfiguration().isStatisticsGenerationBrokerEnabled()); - _messageStats = new StatisticsCounter("messages"); - _dataStats = new StatisticsCounter("bytes"); + _messagesDelivered = new StatisticsCounter("messages-delivered"); + _dataDelivered = new StatisticsCounter("bytes-delivered"); + _messagesReceived = new StatisticsCounter("messages-received"); + _dataReceived = new StatisticsCounter("bytes-received"); } public boolean isStatisticsEnabled() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java index 02db9226dc..36fec4025a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java @@ -47,7 +47,7 @@ public interface StatisticsGatherer void initialiseStatistics(); /** - * This method is responsible for registering the delivery of a message + * This method is responsible for registering the receipt of a message * with the counters, and also for passing this notification to any parent * {@link StatisticsGatherer}s. If statistics generation is not enabled, * then this method should simple delegate to the parent gatherer. @@ -55,23 +55,49 @@ public interface StatisticsGatherer * @param messageSize the size in bytes of the delivered message * @param timestamp the time the message was delivered */ - void registerMessageDelivery(long messageSize, long timestamp); + void registerMessageReceived(long messageSize, long timestamp); + + /** + * This method is responsible for registering the delivery of a message + * with the counters. Message delivery is recorded by the counter using + * the current system time, as opposed to the message timestamp. + * + * @param messageSize the size in bytes of the delivered message + * @see #registerMessageReceived(long, long) + */ + void registerMessageDelivered(long messageSize); + + /** + * Gives access to the {@link StatisticsCounter} that is used to count + * delivered message statistics. + * + * @return the {@link StatisticsCounter} that counts delivered messages + */ + StatisticsCounter getMessageDeliveryStatistics(); + + /** + * Gives access to the {@link StatisticsCounter} that is used to count + * received message statistics. + * + * @return the {@link StatisticsCounter} that counts received messages + */ + StatisticsCounter getMessageReceiptStatistics(); /** * Gives access to the {@link StatisticsCounter} that is used to count - * message statistics. + * delivered message size statistics. * - * @return the {@link StatisticsCounter} that counts messages + * @return the {@link StatisticsCounter} that counts delivered bytes */ - StatisticsCounter getMessageStatistics(); + StatisticsCounter getDataDeliveryStatistics(); /** * Gives access to the {@link StatisticsCounter} that is used to count - * message size statistics. + * received message size statistics. * - * @return the {@link StatisticsCounter} that counts bytes + * @return the {@link StatisticsCounter} that counts received bytes */ - StatisticsCounter getDataStatistics(); + StatisticsCounter getDataReceiptStatistics(); /** * Reset the counters for this, and any child {@link StatisticsGatherer}s. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 32a3c2b580..f1efbe5e53 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -100,7 +100,7 @@ public class VirtualHost implements Accessable, StatisticsGatherer private ApplicationRegistry _registry; private boolean _statisticsEnabled = false; - private StatisticsCounter _messageStats, _dataStats; + private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; public void setAccessableName(String name) { @@ -490,30 +490,52 @@ public class VirtualHost implements Accessable, StatisticsGatherer return _virtualHostMBean; } - public void registerMessageDelivery(long messageSize, long timestamp) + public void registerMessageDelivered(long messageSize) { if (isStatisticsEnabled()) { - _messageStats.registerEvent(1L, timestamp); - _dataStats.registerEvent(messageSize, timestamp); + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); } - _registry.registerMessageDelivery(messageSize, timestamp); + _registry.registerMessageDelivered(messageSize); } - public StatisticsCounter getMessageStatistics() + public void registerMessageReceived(long messageSize, long timestamp) { - return _messageStats; + if (isStatisticsEnabled()) + { + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); + } + _registry.registerMessageReceived(messageSize, timestamp); + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; } - public StatisticsCounter getDataStatistics() + public StatisticsCounter getDataDeliveryStatistics() { - return _dataStats; + return _dataDelivered; } public void resetStatistics() { - _messageStats.reset(); - _dataStats.reset(); + _messagesDelivered.reset(); + _dataDelivered.reset(); + _messagesReceived.reset(); + _dataReceived.reset(); for (AMQProtocolSession session : _connectionRegistry.getConnections()) { @@ -526,8 +548,10 @@ public class VirtualHost implements Accessable, StatisticsGatherer setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && _registry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled()); - _messageStats = new StatisticsCounter("messages-" + getName()); - _dataStats = new StatisticsCounter("bytes-" + getName()); + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); + _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); + _messagesReceived = new StatisticsCounter("messages-received-" + getName()); + _dataReceived = new StatisticsCounter("bytes-received-" + getName()); } public boolean isStatisticsEnabled() diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java index a1a04c77be..07c92d46dc 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java @@ -133,52 +133,100 @@ public interface ManagedBroker void resetStatistics() throws Exception; /** - * Peak rate of messages per second for the virtual host. + * Peak rate of messages delivered per second for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate") - double getPeakMessageRate(); + @MBeanAttribute(name="peakMessageDeliveryRate", description=TYPE + " Peak Message Delivery Rate") + double getPeakMessageDeliveryRate(); /** - * Peak rate of bytes per second for the virtual host. + * Peak rate of bytes delivered per second for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate") - double getPeakDataRate(); + @MBeanAttribute(name="peakDataDeliveryRate", description=TYPE + " Peak Data Delivery Rate") + double getPeakDataDeliveryRate(); /** - * Rate of messages per second for the virtual host. + * Rate of messages delivered per second for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate") - double getMessageRate(); + @MBeanAttribute(name="messageDeliveryRate", description=TYPE + " Message Delivery Rate") + double getMessageDeliveryRate(); /** - * Rate of bytes per second for the virtual host. + * Rate of bytes delivered per second for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate") - double getDataRate(); + @MBeanAttribute(name="dataDeliveryRate", description=TYPE + " Data Delivery Rate") + double getDataDeliveryRate(); /** - * Total count of messages for the virtual host. + * Total count of messages delivered for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count") - long getTotalMessages(); + @MBeanAttribute(name="totalMessagesDelivery", description=TYPE + " Total Messages Delivered") + long getTotalMessagesDelivered(); /** * Total count of bytes for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes") - long getTotalData(); + @MBeanAttribute(name="totalDataDelivered", description=TYPE + " Total Data Delivered") + long getTotalDataDelivered(); + + /** + * Peak rate of messages received per second for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakMessageReceiptRate", description=TYPE + " Peak Message Receipt Rate") + double getPeakMessageReceiptRate(); + + /** + * Peak rate of bytes received per second for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakDataReceiptRate", description=TYPE + " Peak Data Receipt Rate") + double getPeakDataReceiptRate(); + + /** + * Rate of messages received per second for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="messageReceiptRate", description=TYPE + " Message Receipt Rate") + double getMessageReceiptRate(); + + /** + * Rate of bytes received per second for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="dataReceiptRate", description=TYPE + " Data Receipt Rate") + double getDataReceiptRate(); + + /** + * Total count of messages received for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalMessagesReceived", description=TYPE + " Total Messages Received") + long getTotalMessagesReceived(); + + /** + * Total count of bytes received for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalDataReceived", description=TYPE + " Total Data Received") + long getTotalDataReceived(); /** * Is statistics collection enabled for this connection. diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java index f2be759249..37353477a4 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java @@ -151,52 +151,100 @@ public interface ManagedConnection void resetStatistics() throws Exception; /** - * Peak rate of messages per second on this connection. + * Peak rate of messages delivered per second for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate") - double getPeakMessageRate(); + @MBeanAttribute(name="peakMessageDeliveryRate", description=TYPE + " Peak Message Delivery Rate") + double getPeakMessageDeliveryRate(); /** - * Peak rate of bytes per second on this connection. + * Peak rate of bytes delivered per second for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate") - double getPeakDataRate(); + @MBeanAttribute(name="peakDataDeliveryRate", description=TYPE + " Peak Data Delivery Rate") + double getPeakDataDeliveryRate(); /** - * Rate of messages per second on this connection. + * Rate of messages delivered per second for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate") - double getMessageRate(); + @MBeanAttribute(name="messageDeliveryRate", description=TYPE + " Message Delivery Rate") + double getMessageDeliveryRate(); /** - * Rate of bytes per second on this connection. + * Rate of bytes delivered per second for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate") - double getDataRate(); + @MBeanAttribute(name="dataDeliveryRate", description=TYPE + " Data Delivery Rate") + double getDataDeliveryRate(); /** - * Total count of messages on this connection. + * Total count of messages delivered for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count") - long getTotalMessages(); + @MBeanAttribute(name="totalMessagesDelivery", description=TYPE + " Total Messages Delivered") + long getTotalMessagesDelivered(); /** - * Total count of bytes on this connection. + * Total count of bytes for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes") - long getTotalData(); + @MBeanAttribute(name="totalDataDelivered", description=TYPE + " Total Data Delivered") + long getTotalDataDelivered(); + + /** + * Peak rate of messages received per second for this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakMessageReceiptRate", description=TYPE + " Peak Message Receipt Rate") + double getPeakMessageReceiptRate(); + + /** + * Peak rate of bytes received per second for this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakDataReceiptRate", description=TYPE + " Peak Data Receipt Rate") + double getPeakDataReceiptRate(); + + /** + * Rate of messages received per second for this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="messageReceiptRate", description=TYPE + " Message Receipt Rate") + double getMessageReceiptRate(); + + /** + * Rate of bytes received per second for this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="dataReceiptRate", description=TYPE + " Data Receipt Rate") + double getDataReceiptRate(); + + /** + * Total count of messages received for this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalMessagesReceived", description=TYPE + " Total Messages Received") + long getTotalMessagesReceived(); + + /** + * Total count of bytes received for this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalDataReceived", description=TYPE + " Total Data Received") + long getTotalDataReceived(); /** * Is statistics collection enabled for this connection. diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java index a47824f738..5388610342 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java @@ -96,52 +96,100 @@ public interface ServerInformation void resetStatistics() throws Exception; /** - * Peak rate of messages per second for the broker. + * Peak rate of messages delivered per second for the virtual host. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate") - double getPeakMessageRate(); + @MBeanAttribute(name="peakMessageDeliveryRate", description=TYPE + " Peak Message Delivery Rate") + double getPeakMessageDeliveryRate(); /** - * Peak rate of bytes per second for the broker. + * Peak rate of bytes delivered per second for the broker. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate") - double getPeakDataRate(); + @MBeanAttribute(name="peakDataDeliveryRate", description=TYPE + " Peak Data Delivery Rate") + double getPeakDataDeliveryRate(); /** - * Rate of messages per second for the broker. + * Rate of messages delivered per second for the broker. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate") - double getMessageRate(); + @MBeanAttribute(name="messageDeliveryRate", description=TYPE + " Message Delivery Rate") + double getMessageDeliveryRate(); /** - * Rate of bytes per second for the broker. + * Rate of bytes delivered per second for the broker. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate") - double getDataRate(); + @MBeanAttribute(name="dataDeliveryRate", description=TYPE + " Data Delivery Rate") + double getDataDeliveryRate(); /** - * Total count of messages for the broker. + * Total count of messages delivered for the broker. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count") - long getTotalMessages(); + @MBeanAttribute(name="totalMessagesDelivery", description=TYPE + " Total Messages Delivered") + long getTotalMessagesDelivered(); /** * Total count of bytes for the broker. * * @since Qpid JMX API 1.9 */ - @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes") - long getTotalData(); + @MBeanAttribute(name="totalDataDelivered", description=TYPE + " Total Data Delivered") + long getTotalDataDelivered(); + + /** + * Peak rate of messages received per second for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakMessageReceiptRate", description=TYPE + " Peak Message Receipt Rate") + double getPeakMessageReceiptRate(); + + /** + * Peak rate of bytes received per second for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakDataReceiptRate", description=TYPE + " Peak Data Receipt Rate") + double getPeakDataReceiptRate(); + + /** + * Rate of messages received per second for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="messageReceiptRate", description=TYPE + " Message Receipt Rate") + double getMessageReceiptRate(); + + /** + * Rate of bytes received per second for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="dataReceiptRate", description=TYPE + " Data Receipt Rate") + double getDataReceiptRate(); + + /** + * Total count of messages received for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalMessagesReceived", description=TYPE + " Total Messages Received") + long getTotalMessagesReceived(); + + /** + * Total count of bytes received for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalDataReceived", description=TYPE + " Total Data Received") + long getTotalDataReceived(); /** * Is statistics collection enabled for this connection. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java index e68d002fe5..cea90709e1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java @@ -47,18 +47,19 @@ public class MessageConnectionStatisticsTest extends MessageStatisticsTestCase ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); sendUsing(_test, 5, 200); + Thread.sleep(1000); List<String> addresses = new ArrayList<String>(); for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) { - assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); - assertEquals("Incorrect connection data", 0, mc.getTotalData()); + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); addresses.add(mc.getRemoteAddress()); } - assertEquals("Incorrect vhost total", 0, vhost.getTotalMessages()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalData()); + assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); test.start(); @@ -69,8 +70,8 @@ public class MessageConnectionStatisticsTest extends MessageStatisticsTestCase continue; } mc.setStatisticsEnabled(true); - assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); - assertEquals("Incorrect connection data", 0, mc.getTotalData()); + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); } sendUsing(test, 5, 200); @@ -80,19 +81,21 @@ public class MessageConnectionStatisticsTest extends MessageStatisticsTestCase { if (addresses.contains(mc.getRemoteAddress())) { - assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); - assertEquals("Incorrect connection data", 0, mc.getTotalData()); + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); } else { - assertEquals("Incorrect connection total", 5, mc.getTotalMessages()); - assertEquals("Incorrect connection data", 1000, mc.getTotalData()); + assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); } } - assertEquals("Incorrect vhost total", 0, vhost.getTotalMessages()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalData()); + assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); + + test.close(); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java index dbd26f833a..df8c6e74cd 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java @@ -45,20 +45,20 @@ public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCas for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) { - assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); - assertEquals("Incorrect connection data", 0, mc.getTotalData()); + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); } ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 0, vhost.getTotalMessages()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalData()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); if (!_broker.equals(VM)) { - assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages()); - assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData()); + assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); } } @@ -73,20 +73,20 @@ public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCas for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) { - assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); - assertEquals("Incorrect connection data", 0, mc.getTotalData()); + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); } ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages()); - assertEquals("Incorrect vhost data", 1000, vhost.getTotalData()); + assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); if (!_broker.equals(VM)) { - assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessages()); - assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalData()); + assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived()); assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); } } @@ -101,20 +101,20 @@ public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCas for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) { - assertEquals("Incorrect connection total", 5, mc.getTotalMessages()); - assertEquals("Incorrect connection data", 1000, mc.getTotalData()); + assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); } ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 0, vhost.getTotalMessages()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalData()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); if (!_broker.equals(VM)) { - assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessages()); - assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalData()); + assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived()); assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); } } @@ -129,20 +129,20 @@ public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCas for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) { - assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); - assertEquals("Incorrect connection data", 0, mc.getTotalData()); + assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); } ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages()); - assertEquals("Incorrect vhost data", 1000, vhost.getTotalData()); + assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); if (!_broker.equals(VM)) { - assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages()); - assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData()); + assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); } } @@ -157,20 +157,20 @@ public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCas for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) { - assertEquals("Incorrect connection total", 5, mc.getTotalMessages()); - assertEquals("Incorrect connection data", 1000, mc.getTotalData()); + assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); } ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages()); - assertEquals("Incorrect vhost data", 1000, vhost.getTotalData()); + assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); if (!_broker.equals(VM)) { - assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages()); - assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData()); + assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java new file mode 100644 index 0000000000..b157d8d4f5 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; + +/** + * Test generation of message statistics. + */ +public class MessageStatisticsDeliveryTest extends MessageStatisticsTestCase +{ + public void configureStatistics() throws Exception + { + setConfigurationProperty("statistics.generation.broker", "true"); + setConfigurationProperty("statistics.generation.virtualhosts", "true"); + setConfigurationProperty("statistics.generation.connections", "true"); + } + + /** + * Test statistics on a single connection + */ + public void testEnablingStatisticsPerConnection() throws Exception + { + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + List<String> addresses = new ArrayList<String>(); + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered()); + assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered()); + assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived()); + + addresses.add(mc.getRemoteAddress()); + } + + assertEquals("Incorrect vhost delivery total", 0, vhost.getTotalMessagesDelivered()); + assertEquals("Incorrect vhost delivery data", 0, vhost.getTotalDataDelivered()); + assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived()); + + Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); + test.start(); + receiveUsing(test, 5); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + if (addresses.contains(mc.getRemoteAddress())) + { + assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered()); + assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered()); + assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived()); + } + else + { + assertEquals("Incorrect connection delivery total", 5, mc.getTotalMessagesDelivered()); + assertEquals("Incorrect connection delivery data", 1000, mc.getTotalDataDelivered()); + assertEquals("Incorrect connection receipt total", 0, mc.getTotalMessagesReceived()); + assertEquals("Incorrect connection receipt data", 0, mc.getTotalDataReceived()); + } + } + assertEquals("Incorrect vhost delivery total", 5, vhost.getTotalMessagesDelivered()); + assertEquals("Incorrect vhost delivery data", 1000, vhost.getTotalDataDelivered()); + assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived()); + assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived()); + + test.close(); + } + + protected void receiveUsing(Connection con, int number) throws Exception + { + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(_queue); + for (int i = 0; i < number; i++) + { + Message msg = consumer.receive(100); + assertNotNull(msg); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java index 7051c426b9..180440c0d6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java @@ -60,10 +60,10 @@ public class MessageStatisticsReportingTest extends MessageStatisticsTestCase List<String> vhostStatsData = _monitor.findMatches("VHT-1003"); List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004"); - assertEquals("Incorrect number of broker data stats log messages", 1, brokerStatsData.size()); - assertEquals("Incorrect number of broker message stats log messages", 1, brokerStatsMessages.size()); - assertEquals("Incorrect number of virtualhost data stats log messages", 3, vhostStatsData.size()); - assertEquals("Incorrect number of virtualhost message stats log messages", 3, vhostStatsMessages.size()); + assertEquals("Incorrect number of broker data stats log messages", 2, brokerStatsData.size()); + assertEquals("Incorrect number of broker message stats log messages", 2, brokerStatsMessages.size()); + assertEquals("Incorrect number of virtualhost data stats log messages", 6, vhostStatsData.size()); + assertEquals("Incorrect number of virtualhost message stats log messages", 6, vhostStatsMessages.size()); } /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java index 58fbb92723..44bb368f5a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java @@ -57,52 +57,52 @@ public class MessageStatisticsTest extends MessageStatisticsTestCase long data = 0; for (ManagedConnection mc : _jmxUtils.getManagedConnections("*")) { - total += mc.getTotalMessages(); - data += mc.getTotalData(); + total += mc.getTotalMessagesReceived(); + data += mc.getTotalDataReceived(); } assertEquals("Incorrect connection total", 45, total); assertEquals("Incorrect connection data", 4500, data); if (!_broker.equals(VM)) { - assertEquals("Incorrect server total", 45, _jmxUtils.getServerInformation().getTotalMessages()); - assertEquals("Incorrect server data", 4500, _jmxUtils.getServerInformation().getTotalData()); + assertEquals("Incorrect server total", 45, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server data", 4500, _jmxUtils.getServerInformation().getTotalDataReceived()); } long testTotal = 0; long testData = 0; for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) { - testTotal += mc.getTotalMessages(); - testData += mc.getTotalData(); + testTotal += mc.getTotalMessagesReceived(); + testData += mc.getTotalDataReceived(); } assertEquals("Incorrect test connection total", 10, testTotal); - assertEquals("Incorrect test vhost total", 10, test.getTotalMessages()); + assertEquals("Incorrect test vhost total", 10, test.getTotalMessagesReceived()); assertEquals("Incorrect test connection data", 1000, testData); - assertEquals("Incorrect test vhost data", 1000, test.getTotalData()); + assertEquals("Incorrect test vhost data", 1000, test.getTotalDataReceived()); long devTotal = 0; long devData = 0; for (ManagedConnection mc : _jmxUtils.getManagedConnections("development")) { - devTotal += mc.getTotalMessages(); - devData += mc.getTotalData(); + devTotal += mc.getTotalMessagesReceived(); + devData += mc.getTotalDataReceived(); } assertEquals("Incorrect test connection total", 20, devTotal); - assertEquals("Incorrect development total", 20, dev.getTotalMessages()); + assertEquals("Incorrect development total", 20, dev.getTotalMessagesReceived()); assertEquals("Incorrect test connection data", 2000, devData); - assertEquals("Incorrect development data", 2000, dev.getTotalData()); + assertEquals("Incorrect development data", 2000, dev.getTotalDataReceived()); long localTotal = 0; long localData = 0; for (ManagedConnection mc : _jmxUtils.getManagedConnections("localhost")) { - localTotal += mc.getTotalMessages(); - localData += mc.getTotalData(); + localTotal += mc.getTotalMessagesReceived(); + localData += mc.getTotalDataReceived(); } assertEquals("Incorrect test connection total", 15, localTotal); - assertEquals("Incorrect localhost total", 15, local.getTotalMessages()); + assertEquals("Incorrect localhost total", 15, local.getTotalMessagesReceived()); assertEquals("Incorrect test connection data", 1500, localData); - assertEquals("Incorrect localhost data", 1500, local.getTotalData()); + assertEquals("Incorrect localhost data", 1500, local.getTotalDataReceived()); } /** @@ -125,28 +125,28 @@ public class MessageStatisticsTest extends MessageStatisticsTestCase long data = 0; for (ManagedConnection mc : _jmxUtils.getManagedConnections("*")) { - total += mc.getTotalMessages(); - data += mc.getTotalData(); + total += mc.getTotalMessagesReceived(); + data += mc.getTotalDataReceived(); } assertEquals("Incorrect active connection total", 20, total); assertEquals("Incorrect active connection data", 2000, data); if (!_broker.equals(VM)) { - assertEquals("Incorrect server total", 30, _jmxUtils.getServerInformation().getTotalMessages()); - assertEquals("Incorrect server data", 3000, _jmxUtils.getServerInformation().getTotalData()); + assertEquals("Incorrect server total", 30, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server data", 3000, _jmxUtils.getServerInformation().getTotalDataReceived()); } long testTotal = 0; long testData = 0; for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) { - testTotal += mc.getTotalMessages(); - testData += mc.getTotalData(); + testTotal += mc.getTotalMessagesReceived(); + testData += mc.getTotalDataReceived(); } assertEquals("Incorrect test active connection total", 20, testTotal); - assertEquals("Incorrect test vhost total", 30, test.getTotalMessages()); + assertEquals("Incorrect test vhost total", 30, test.getTotalMessagesReceived()); assertEquals("Incorrect test active connection data", 20 * 100, testData); - assertEquals("Incorrect test vhost data", 30 * 100, test.getTotalData()); + assertEquals("Incorrect test vhost data", 30 * 100, test.getTotalDataReceived()); } /** @@ -161,15 +161,15 @@ public class MessageStatisticsTest extends MessageStatisticsTestCase ManagedBroker test = _jmxUtils.getManagedBroker("test"); ManagedBroker dev = _jmxUtils.getManagedBroker("development"); - assertApprox("Incorrect test vhost peak messages", 0.2d, 1.0d, test.getPeakMessageRate()); - assertApprox("Incorrect test vhost peak data", 0.2d, 10000.0d, test.getPeakDataRate()); - assertApprox("Incorrect dev vhost peak messages", 0.2d, 10.0d, dev.getPeakMessageRate()); - assertApprox("Incorrect dev vhost peak data", 0.2d, 100.0d, dev.getPeakDataRate()); + assertApprox("Incorrect test vhost peak messages", 0.2d, 1.0d, test.getPeakMessageReceiptRate()); + assertApprox("Incorrect test vhost peak data", 0.2d, 10000.0d, test.getPeakDataReceiptRate()); + assertApprox("Incorrect dev vhost peak messages", 0.2d, 10.0d, dev.getPeakMessageReceiptRate()); + assertApprox("Incorrect dev vhost peak data", 0.2d, 100.0d, dev.getPeakDataReceiptRate()); if (!_broker.equals(VM)) { - assertApprox("Incorrect server peak messages", 0.2d, 10.0d, _jmxUtils.getServerInformation().getPeakMessageRate()); - assertApprox("Incorrect server peak data", 0.2d, 10000.0d, _jmxUtils.getServerInformation().getPeakDataRate()); + assertApprox("Incorrect server peak messages", 0.2d, 10.0d, _jmxUtils.getServerInformation().getPeakMessageReceiptRate()); + assertApprox("Incorrect server peak data", 0.2d, 10000.0d, _jmxUtils.getServerInformation().getPeakDataReceiptRate()); } } @@ -184,28 +184,28 @@ public class MessageStatisticsTest extends MessageStatisticsTestCase ManagedBroker test = _jmxUtils.getManagedBroker("test"); ManagedBroker dev = _jmxUtils.getManagedBroker("development"); - assertEquals("Incorrect test vhost total messages", 10, test.getTotalMessages()); - assertEquals("Incorrect test vhost total data", 100, test.getTotalData()); - assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessages()); - assertEquals("Incorrect dev vhost total data", 100, dev.getTotalData()); + assertEquals("Incorrect test vhost total messages", 10, test.getTotalMessagesReceived()); + assertEquals("Incorrect test vhost total data", 100, test.getTotalDataReceived()); + assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived()); + assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived()); if (!_broker.equals(VM)) { - assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessages()); - assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalData()); + assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived()); } test.resetStatistics(); - assertEquals("Incorrect test vhost total messages", 0, test.getTotalMessages()); - assertEquals("Incorrect test vhost total data", 0, test.getTotalData()); - assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessages()); - assertEquals("Incorrect dev vhost total data", 100, dev.getTotalData()); + assertEquals("Incorrect test vhost total messages", 0, test.getTotalMessagesReceived()); + assertEquals("Incorrect test vhost total data", 0, test.getTotalDataReceived()); + assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived()); + assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived()); if (!_broker.equals(VM)) { - assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessages()); - assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalData()); + assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived()); + assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived()); } } } |
