summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-11-23 15:14:07 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-11-23 15:14:07 +0000
commitd6d8259fdd1324de272776bc055478a8bcc5bf94 (patch)
tree52b660e124af9d6ea75e87fb1ae0da2f59f52988
parenta57b35597317bfc680af76bf276301f1491c5d2b (diff)
downloadqpid-python-d6d8259fdd1324de272776bc055478a8bcc5bf94.tar.gz
QPID-2932: Updating tests and placement of counter callback
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1038157 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter1.java163
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter2.java162
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java26
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java15
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java2
8 files changed, 355 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1240adcae4..129d40d530 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -206,9 +206,6 @@ public class AMQChannel
// check and deliver if header says body length is zero
if (_currentMessage.allContentReceived())
{
- long bodySize = _currentMessage.getContentHeaderBody().bodySize;
- long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp();
- _session.registerMessageReceived(bodySize, timestamp);
try
{
_currentMessage.deliverToQueues();
@@ -223,6 +220,9 @@ public class AMQChannel
}
finally
{
+ long bodySize = _currentMessage.getContentHeaderBody().bodySize;
+ long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp();
+ _session.registerMessageReceived(bodySize, timestamp);
// callback to allow the context to do any post message processing
// primary use is to allow message return processing in the non-tx case
_txnContext.messageProcessed(_session);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
index 04f94aa1b3..c64e4aaf3f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
@@ -82,7 +82,7 @@ public class StatisticsCounter
{
return;
}
-
+
long thisSample = (timestamp / _period);
synchronized (this)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter1.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter1.java
new file mode 100644
index 0000000000..65047ff6ce
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter1.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class collects statistics and counts the total, rate per second and
+ * peak rate per second values for the events that are registered with it.
+ */
+public class StatisticsCounter1
+{
+ private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter1.class);
+
+ public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s
+ public static final boolean DISABLE_STATISTICS = Boolean.getBoolean("qpid.statistics.disable");
+
+ private static final String COUNTER = "counter";
+ private static final AtomicLong _counterIds = new AtomicLong(0L);
+
+ private final AtomicLong _peak = new AtomicLong(0L);
+ private final AtomicLong _total = new AtomicLong(0L);
+ private final AtomicLong _temp = new AtomicLong(0L);
+ private final AtomicLong _last = new AtomicLong(0L);
+ private final AtomicLong _rate = new AtomicLong(0L);
+
+ private long _start;
+
+ private final long _period;
+ private final String _name;
+
+ public StatisticsCounter1()
+ {
+ this(COUNTER);
+ }
+
+ public StatisticsCounter1(String name)
+ {
+ this(name, DEFAULT_SAMPLE_PERIOD);
+ }
+
+ public StatisticsCounter1(String name, long period)
+ {
+ _period = period;
+ _name = name + "-" + + _counterIds.incrementAndGet();
+ reset();
+ }
+
+ public void registerEvent()
+ {
+ registerEvent(1L);
+ }
+
+ public void registerEvent(long value)
+ {
+ registerEvent(value, System.currentTimeMillis());
+ }
+
+ public void registerEvent(long value, long timestamp)
+ {
+ if (DISABLE_STATISTICS)
+ {
+ return;
+ }
+
+ long thisSample = (timestamp / _period);
+ long lastSample;
+ while (thisSample > (lastSample = _last.get()))
+ {
+ if (_last.compareAndSet(lastSample, thisSample))
+ {
+ long current = _temp.getAndSet(0L);
+ _rate.set(current);
+ long peak;
+ while (current > (peak = _peak.get()))
+ {
+ _peak.compareAndSet(peak, current);
+ }
+ }
+ }
+
+ _total.addAndGet(value);
+ _temp.addAndGet(value);
+ }
+
+ /**
+ * Update the current rate and peak - may reset rate to zero if a new
+ * sample period has started.
+ */
+ private void update()
+ {
+ registerEvent(0L, System.currentTimeMillis());
+ }
+
+ /**
+ * Reset
+ */
+ public void reset()
+ {
+ _peak.set(0L);
+ _rate.set(0L);
+ _total.set(0L);
+ _start = System.currentTimeMillis();
+ _last.set(_start / _period);
+ }
+
+ public double getPeak()
+ {
+ update();
+ return (double) _peak.get() / ((double) _period / 1000.0d);
+ }
+
+ public double getRate()
+ {
+ update();
+ return (double) _rate.get() / ((double) _period / 1000.0d);
+ }
+
+ public long getTotal()
+ {
+ return _total.get();
+ }
+
+ public long getStart()
+ {
+ return _start;
+ }
+
+ public Date getStartTime()
+ {
+ return new Date(_start);
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public long getPeriod()
+ {
+ return _period;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter2.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter2.java
new file mode 100644
index 0000000000..d8b9a6abdc
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter2.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class collects statistics and counts the total, rate per second and
+ * peak rate per second values for the events that are registered with it.
+ */
+public class StatisticsCounter2
+{
+ private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter2.class);
+
+ public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s
+ public static final boolean DISABLE_STATISTICS = Boolean.getBoolean("qpid.statistics.disable");
+
+ private static final String COUNTER = "counter";
+ private static final AtomicLong _counterIds = new AtomicLong(0L);
+
+ private long _peak = 0L;
+ private long _total = 0L;
+ private long _temp = 0L;
+ private long _last = 0L;
+ private long _rate = 0L;
+
+ private long _start;
+
+ private final long _period;
+ private final String _name;
+
+ public StatisticsCounter2()
+ {
+ this(COUNTER);
+ }
+
+ public StatisticsCounter2(String name)
+ {
+ this(name, DEFAULT_SAMPLE_PERIOD);
+ }
+
+ public StatisticsCounter2(String name, long period)
+ {
+ _period = period;
+ _name = name + "-" + + _counterIds.incrementAndGet();
+ reset();
+ }
+
+ public void registerEvent()
+ {
+ registerEvent(1L);
+ }
+
+ public void registerEvent(long value)
+ {
+ registerEvent(value, System.currentTimeMillis());
+ }
+
+ public void registerEvent(long value, long timestamp)
+ {
+ if (DISABLE_STATISTICS)
+ {
+ return;
+ }
+
+ long thisSample = (timestamp / _period);
+ synchronized (this)
+ {
+ if (thisSample > _last)
+ {
+ _last = thisSample;
+ _rate = _temp;
+ _temp = 0L;
+ if (_rate > _peak)
+ {
+ _peak = _rate;
+ }
+ }
+
+ _total += value;
+ _temp += value;
+ }
+ }
+
+ /**
+ * Update the current rate and peak - may reset rate to zero if a new
+ * sample period has started.
+ */
+ private void update()
+ {
+ registerEvent(0L, System.currentTimeMillis());
+ }
+
+ /**
+ * Reset
+ */
+ public void reset()
+ {
+ _peak = 0L;
+ _rate = 0L;
+ _total = 0L;
+ _start = System.currentTimeMillis();
+ _last = _start / _period;
+ }
+
+ public double getPeak()
+ {
+ update();
+ return (double) _peak / ((double) _period / 1000.0d);
+ }
+
+ public double getRate()
+ {
+ update();
+ return (double) _rate / ((double) _period / 1000.0d);
+ }
+
+ public long getTotal()
+ {
+ return _total;
+ }
+
+ public long getStart()
+ {
+ return _start;
+ }
+
+ public Date getStartTime()
+ {
+ return new Date(_start);
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public long getPeriod()
+ {
+ return _period;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index f1efbe5e53..4f32c266b5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -38,7 +38,6 @@ import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
@@ -49,7 +48,6 @@ import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
@@ -61,7 +59,6 @@ import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
index 1e30946b01..a9bdcb2d21 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
@@ -76,22 +76,25 @@ public class StatisticsCounterTest extends TestCase
/**
* Test that the peak rate is reported correctly.
*/
- public void testPeak()
+ public void testPeak() throws Exception
{
StatisticsCounter counter = new StatisticsCounter("test", 1000L);
long start = counter.getStart();
assertEquals(0.0, counter.getPeak());
counter.registerEvent(1000, start + 500);
+ Thread.sleep(1250);
assertEquals(1000.0, counter.getPeak());
counter.registerEvent(2000, start + 1500);
+ Thread.sleep(1000);
assertEquals(2000.0, counter.getPeak());
counter.registerEvent(1000, start + 2500);
+ Thread.sleep(1000);
assertEquals(2000.0, counter.getPeak());
}
/**
- * Test that peak rate is reported correctly even when messages are
- * delivered out-of-order.
+ * Test that peak rate is reported correctly for out-of-order messages,
+ * and the total is also unaffected.
*/
public void testPeakOutOfOrder() throws Exception
{
@@ -99,18 +102,22 @@ public class StatisticsCounterTest extends TestCase
long start = counter.getStart();
assertEquals(0.0, counter.getPeak());
counter.registerEvent(1000, start + 2500);
- assertEquals(1000.0, counter.getPeak());
+ Thread.sleep(1250);
+ assertEquals(0.0, counter.getPeak());
counter.registerEvent(2000, start + 1500);
- assertEquals(3000.0, counter.getPeak());
+ Thread.sleep(1000);
+ assertEquals(0.0, counter.getPeak());
counter.registerEvent(1000, start + 500);
+ Thread.sleep(1500);
assertEquals(4000.0, counter.getPeak());
Thread.sleep(2000);
assertEquals(4000.0, counter.getPeak());
counter.registerEvent(1000, start + 500);
- assertEquals(5000.0, counter.getPeak());
+ assertEquals(4000.0, counter.getPeak());
Thread.sleep(2000);
counter.registerEvent(1000);
- assertEquals(5000.0, counter.getPeak());
+ assertEquals(4000.0, counter.getPeak());
+ assertEquals(6000, counter.getTotal());
}
/**
@@ -122,12 +129,13 @@ public class StatisticsCounterTest extends TestCase
assertEquals(0.0, counter.getRate());
Thread.sleep(100);
counter.registerEvent(1000);
+ Thread.sleep(1250);
assertEquals(1000.0, counter.getRate());
- Thread.sleep(1000);
counter.registerEvent(2000);
- assertEquals(2000.0, counter.getRate());
Thread.sleep(1000);
+ assertEquals(2000.0, counter.getRate());
counter.registerEvent(1000);
+ Thread.sleep(1000);
assertEquals(1000.0, counter.getRate());
Thread.sleep(1000);
assertEquals(0.0, counter.getRate());
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
index 44bb368f5a..721a464c67 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
@@ -154,22 +154,23 @@ public class MessageStatisticsTest extends MessageStatisticsTestCase
*/
public void testMessagePeakRates() throws Exception
{
- sendUsing(_test, 1, 10000);
+ sendUsing(_test, 2, 10);
+ Thread.sleep(10 * 1000);
+ sendUsing(_dev, 4, 10);
Thread.sleep(10 * 1000);
- sendUsing(_dev, 10, 10);
ManagedBroker test = _jmxUtils.getManagedBroker("test");
ManagedBroker dev = _jmxUtils.getManagedBroker("development");
assertApprox("Incorrect test vhost peak messages", 0.2d, 1.0d, test.getPeakMessageReceiptRate());
- assertApprox("Incorrect test vhost peak data", 0.2d, 10000.0d, test.getPeakDataReceiptRate());
- assertApprox("Incorrect dev vhost peak messages", 0.2d, 10.0d, dev.getPeakMessageReceiptRate());
- assertApprox("Incorrect dev vhost peak data", 0.2d, 100.0d, dev.getPeakDataReceiptRate());
+ assertApprox("Incorrect test vhost peak data", 0.2d, 10.0d, test.getPeakDataReceiptRate());
+ assertApprox("Incorrect dev vhost peak messages", 0.2d, 2.0d, dev.getPeakMessageReceiptRate());
+ assertApprox("Incorrect dev vhost peak data", 0.2d, 20.0d, dev.getPeakDataReceiptRate());
if (!_broker.equals(VM))
{
- assertApprox("Incorrect server peak messages", 0.2d, 10.0d, _jmxUtils.getServerInformation().getPeakMessageReceiptRate());
- assertApprox("Incorrect server peak data", 0.2d, 10000.0d, _jmxUtils.getServerInformation().getPeakDataReceiptRate());
+ assertApprox("Incorrect server peak messages", 0.2d, 2.0d, _jmxUtils.getServerInformation().getPeakMessageReceiptRate());
+ assertApprox("Incorrect server peak data", 0.2d, 20.0d, _jmxUtils.getServerInformation().getPeakDataReceiptRate());
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
index 8ca02eb79a..ac95ae176c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
@@ -105,7 +105,7 @@ public abstract class MessageStatisticsTestCase extends QpidTestCase
{
double min = expected * (1.0d - error);
double max = expected * (1.0d + error);
- String assertion = String.format("%s: expected %f +/- %d, actual %f",
+ String assertion = String.format("%s: expected %f +/- %d%%, actual %f",
message, expected, (int) (error * 100.0d), actual);
assertTrue(assertion, actual > min && actual < max);
}