diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-11-23 15:14:07 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-11-23 15:14:07 +0000 |
commit | d6d8259fdd1324de272776bc055478a8bcc5bf94 (patch) | |
tree | 52b660e124af9d6ea75e87fb1ae0da2f59f52988 | |
parent | a57b35597317bfc680af76bf276301f1491c5d2b (diff) | |
download | qpid-python-d6d8259fdd1324de272776bc055478a8bcc5bf94.tar.gz |
QPID-2932: Updating tests and placement of counter callback
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1038157 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 355 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1240adcae4..129d40d530 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -206,9 +206,6 @@ public class AMQChannel // check and deliver if header says body length is zero if (_currentMessage.allContentReceived()) { - long bodySize = _currentMessage.getContentHeaderBody().bodySize; - long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp(); - _session.registerMessageReceived(bodySize, timestamp); try { _currentMessage.deliverToQueues(); @@ -223,6 +220,9 @@ public class AMQChannel } finally { + long bodySize = _currentMessage.getContentHeaderBody().bodySize; + long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp(); + _session.registerMessageReceived(bodySize, timestamp); // callback to allow the context to do any post message processing // primary use is to allow message return processing in the non-tx case _txnContext.messageProcessed(_session); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java index 04f94aa1b3..c64e4aaf3f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java @@ -82,7 +82,7 @@ public class StatisticsCounter { return; } - + long thisSample = (timestamp / _period); synchronized (this) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter1.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter1.java new file mode 100644 index 0000000000..65047ff6ce --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter1.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.stats; + +import java.util.Date; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class collects statistics and counts the total, rate per second and + * peak rate per second values for the events that are registered with it. + */ +public class StatisticsCounter1 +{ + private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter1.class); + + public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s + public static final boolean DISABLE_STATISTICS = Boolean.getBoolean("qpid.statistics.disable"); + + private static final String COUNTER = "counter"; + private static final AtomicLong _counterIds = new AtomicLong(0L); + + private final AtomicLong _peak = new AtomicLong(0L); + private final AtomicLong _total = new AtomicLong(0L); + private final AtomicLong _temp = new AtomicLong(0L); + private final AtomicLong _last = new AtomicLong(0L); + private final AtomicLong _rate = new AtomicLong(0L); + + private long _start; + + private final long _period; + private final String _name; + + public StatisticsCounter1() + { + this(COUNTER); + } + + public StatisticsCounter1(String name) + { + this(name, DEFAULT_SAMPLE_PERIOD); + } + + public StatisticsCounter1(String name, long period) + { + _period = period; + _name = name + "-" + + _counterIds.incrementAndGet(); + reset(); + } + + public void registerEvent() + { + registerEvent(1L); + } + + public void registerEvent(long value) + { + registerEvent(value, System.currentTimeMillis()); + } + + public void registerEvent(long value, long timestamp) + { + if (DISABLE_STATISTICS) + { + return; + } + + long thisSample = (timestamp / _period); + long lastSample; + while (thisSample > (lastSample = _last.get())) + { + if (_last.compareAndSet(lastSample, thisSample)) + { + long current = _temp.getAndSet(0L); + _rate.set(current); + long peak; + while (current > (peak = _peak.get())) + { + _peak.compareAndSet(peak, current); + } + } + } + + _total.addAndGet(value); + _temp.addAndGet(value); + } + + /** + * Update the current rate and peak - may reset rate to zero if a new + * sample period has started. + */ + private void update() + { + registerEvent(0L, System.currentTimeMillis()); + } + + /** + * Reset + */ + public void reset() + { + _peak.set(0L); + _rate.set(0L); + _total.set(0L); + _start = System.currentTimeMillis(); + _last.set(_start / _period); + } + + public double getPeak() + { + update(); + return (double) _peak.get() / ((double) _period / 1000.0d); + } + + public double getRate() + { + update(); + return (double) _rate.get() / ((double) _period / 1000.0d); + } + + public long getTotal() + { + return _total.get(); + } + + public long getStart() + { + return _start; + } + + public Date getStartTime() + { + return new Date(_start); + } + + public String getName() + { + return _name; + } + + public long getPeriod() + { + return _period; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter2.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter2.java new file mode 100644 index 0000000000..d8b9a6abdc --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter2.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.stats; + +import java.util.Date; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class collects statistics and counts the total, rate per second and + * peak rate per second values for the events that are registered with it. + */ +public class StatisticsCounter2 +{ + private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter2.class); + + public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s + public static final boolean DISABLE_STATISTICS = Boolean.getBoolean("qpid.statistics.disable"); + + private static final String COUNTER = "counter"; + private static final AtomicLong _counterIds = new AtomicLong(0L); + + private long _peak = 0L; + private long _total = 0L; + private long _temp = 0L; + private long _last = 0L; + private long _rate = 0L; + + private long _start; + + private final long _period; + private final String _name; + + public StatisticsCounter2() + { + this(COUNTER); + } + + public StatisticsCounter2(String name) + { + this(name, DEFAULT_SAMPLE_PERIOD); + } + + public StatisticsCounter2(String name, long period) + { + _period = period; + _name = name + "-" + + _counterIds.incrementAndGet(); + reset(); + } + + public void registerEvent() + { + registerEvent(1L); + } + + public void registerEvent(long value) + { + registerEvent(value, System.currentTimeMillis()); + } + + public void registerEvent(long value, long timestamp) + { + if (DISABLE_STATISTICS) + { + return; + } + + long thisSample = (timestamp / _period); + synchronized (this) + { + if (thisSample > _last) + { + _last = thisSample; + _rate = _temp; + _temp = 0L; + if (_rate > _peak) + { + _peak = _rate; + } + } + + _total += value; + _temp += value; + } + } + + /** + * Update the current rate and peak - may reset rate to zero if a new + * sample period has started. + */ + private void update() + { + registerEvent(0L, System.currentTimeMillis()); + } + + /** + * Reset + */ + public void reset() + { + _peak = 0L; + _rate = 0L; + _total = 0L; + _start = System.currentTimeMillis(); + _last = _start / _period; + } + + public double getPeak() + { + update(); + return (double) _peak / ((double) _period / 1000.0d); + } + + public double getRate() + { + update(); + return (double) _rate / ((double) _period / 1000.0d); + } + + public long getTotal() + { + return _total; + } + + public long getStart() + { + return _start; + } + + public Date getStartTime() + { + return new Date(_start); + } + + public String getName() + { + return _name; + } + + public long getPeriod() + { + return _period; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index f1efbe5e53..4f32c266b5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -38,7 +38,6 @@ import org.apache.qpid.server.AMQBrokerManagerMBean; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; @@ -49,7 +48,6 @@ import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.AbstractActor; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; @@ -61,7 +59,6 @@ import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.Accessable; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java index 1e30946b01..a9bdcb2d21 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java @@ -76,22 +76,25 @@ public class StatisticsCounterTest extends TestCase /** * Test that the peak rate is reported correctly. */ - public void testPeak() + public void testPeak() throws Exception { StatisticsCounter counter = new StatisticsCounter("test", 1000L); long start = counter.getStart(); assertEquals(0.0, counter.getPeak()); counter.registerEvent(1000, start + 500); + Thread.sleep(1250); assertEquals(1000.0, counter.getPeak()); counter.registerEvent(2000, start + 1500); + Thread.sleep(1000); assertEquals(2000.0, counter.getPeak()); counter.registerEvent(1000, start + 2500); + Thread.sleep(1000); assertEquals(2000.0, counter.getPeak()); } /** - * Test that peak rate is reported correctly even when messages are - * delivered out-of-order. + * Test that peak rate is reported correctly for out-of-order messages, + * and the total is also unaffected. */ public void testPeakOutOfOrder() throws Exception { @@ -99,18 +102,22 @@ public class StatisticsCounterTest extends TestCase long start = counter.getStart(); assertEquals(0.0, counter.getPeak()); counter.registerEvent(1000, start + 2500); - assertEquals(1000.0, counter.getPeak()); + Thread.sleep(1250); + assertEquals(0.0, counter.getPeak()); counter.registerEvent(2000, start + 1500); - assertEquals(3000.0, counter.getPeak()); + Thread.sleep(1000); + assertEquals(0.0, counter.getPeak()); counter.registerEvent(1000, start + 500); + Thread.sleep(1500); assertEquals(4000.0, counter.getPeak()); Thread.sleep(2000); assertEquals(4000.0, counter.getPeak()); counter.registerEvent(1000, start + 500); - assertEquals(5000.0, counter.getPeak()); + assertEquals(4000.0, counter.getPeak()); Thread.sleep(2000); counter.registerEvent(1000); - assertEquals(5000.0, counter.getPeak()); + assertEquals(4000.0, counter.getPeak()); + assertEquals(6000, counter.getTotal()); } /** @@ -122,12 +129,13 @@ public class StatisticsCounterTest extends TestCase assertEquals(0.0, counter.getRate()); Thread.sleep(100); counter.registerEvent(1000); + Thread.sleep(1250); assertEquals(1000.0, counter.getRate()); - Thread.sleep(1000); counter.registerEvent(2000); - assertEquals(2000.0, counter.getRate()); Thread.sleep(1000); + assertEquals(2000.0, counter.getRate()); counter.registerEvent(1000); + Thread.sleep(1000); assertEquals(1000.0, counter.getRate()); Thread.sleep(1000); assertEquals(0.0, counter.getRate()); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java index 44bb368f5a..721a464c67 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java @@ -154,22 +154,23 @@ public class MessageStatisticsTest extends MessageStatisticsTestCase */ public void testMessagePeakRates() throws Exception { - sendUsing(_test, 1, 10000); + sendUsing(_test, 2, 10); + Thread.sleep(10 * 1000); + sendUsing(_dev, 4, 10); Thread.sleep(10 * 1000); - sendUsing(_dev, 10, 10); ManagedBroker test = _jmxUtils.getManagedBroker("test"); ManagedBroker dev = _jmxUtils.getManagedBroker("development"); assertApprox("Incorrect test vhost peak messages", 0.2d, 1.0d, test.getPeakMessageReceiptRate()); - assertApprox("Incorrect test vhost peak data", 0.2d, 10000.0d, test.getPeakDataReceiptRate()); - assertApprox("Incorrect dev vhost peak messages", 0.2d, 10.0d, dev.getPeakMessageReceiptRate()); - assertApprox("Incorrect dev vhost peak data", 0.2d, 100.0d, dev.getPeakDataReceiptRate()); + assertApprox("Incorrect test vhost peak data", 0.2d, 10.0d, test.getPeakDataReceiptRate()); + assertApprox("Incorrect dev vhost peak messages", 0.2d, 2.0d, dev.getPeakMessageReceiptRate()); + assertApprox("Incorrect dev vhost peak data", 0.2d, 20.0d, dev.getPeakDataReceiptRate()); if (!_broker.equals(VM)) { - assertApprox("Incorrect server peak messages", 0.2d, 10.0d, _jmxUtils.getServerInformation().getPeakMessageReceiptRate()); - assertApprox("Incorrect server peak data", 0.2d, 10000.0d, _jmxUtils.getServerInformation().getPeakDataReceiptRate()); + assertApprox("Incorrect server peak messages", 0.2d, 2.0d, _jmxUtils.getServerInformation().getPeakMessageReceiptRate()); + assertApprox("Incorrect server peak data", 0.2d, 20.0d, _jmxUtils.getServerInformation().getPeakDataReceiptRate()); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java index 8ca02eb79a..ac95ae176c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java @@ -105,7 +105,7 @@ public abstract class MessageStatisticsTestCase extends QpidTestCase { double min = expected * (1.0d - error); double max = expected * (1.0d + error); - String assertion = String.format("%s: expected %f +/- %d, actual %f", + String assertion = String.format("%s: expected %f +/- %d%%, actual %f", message, expected, (int) (error * 100.0d), actual); assertTrue(assertion, actual > min && actual < max); } |