summaryrefslogtreecommitdiff
path: root/java/integrationtests
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-06-27 15:49:51 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-06-27 15:49:51 +0000
commit9bac85e36d60df28fb6f86e3bbe9e3f46689aa04 (patch)
tree593c9d3d5cd46191099b558d0ef2d3ef012f968d /java/integrationtests
parentf10117cd6464a107b086e0b7f7ea44a496b04c3d (diff)
downloadqpid-python-9bac85e36d60df28fb6f86e3bbe9e3f46689aa04.tar.gz
Merged revisions 550748-551121 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r550748 | ritchiem | 2007-06-26 10:20:17 +0100 (Tue, 26 Jun 2007) | 1 line Added xml file for logging during sustained tests ........ r550773 | rupertlssmith | 2007-06-26 12:03:04 +0100 (Tue, 26 Jun 2007) | 1 line Immediate and mandatory flag tests added. ........ r550849 | rupertlssmith | 2007-06-26 17:43:58 +0100 (Tue, 26 Jun 2007) | 1 line QPID-509 Mandatory messages not returned outside a transaction. They are now. ........ r551117 | ritchiem | 2007-06-27 11:51:34 +0100 (Wed, 27 Jun 2007) | 2 lines Update to the sustained test to ensure late joining occurs correctly and improved stabilisation. Additional system properties now documented on wiki. http://cwiki.apache.org/qpid/sustained-tests.html ........ r551118 | ritchiem | 2007-06-27 11:51:51 +0100 (Wed, 27 Jun 2007) | 1 line Added intelij files to ignore list ........ r551119 | ritchiem | 2007-06-27 11:55:34 +0100 (Wed, 27 Jun 2007) | 1 line POM update to add Apache content to built jars ........ r551120 | ritchiem | 2007-06-27 11:58:25 +0100 (Wed, 27 Jun 2007) | 1 line Updated default guest password as it was not correct. ........ r551121 | ritchiem | 2007-06-27 12:00:48 +0100 (Wed, 27 Jun 2007) | 1 line Added additional information to log message when available to aid the explination of a failed connection ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551207 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/integrationtests')
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java3
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java175
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java2
-rw-r--r--java/integrationtests/src/resources/sustained-log4j.xml69
4 files changed, 192 insertions, 57 deletions
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
index a82b05e20f..a904bfa419 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
@@ -252,7 +252,8 @@ public class TestClient implements MessageListener
public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
- + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+ + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID
+ + ", String virtualHost = " + virtualHost + " ): called");
try
{
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
index 1597da6dba..6f2089290a 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
@@ -37,6 +37,7 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -91,7 +92,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
private static final long TEN_MILLI_SEC = 10000000;
- private static final long FIVE_MILLI_SEC = 5000000;
+ private static final int DEBUG_LOG_UPATE_INTERVAL = 10;
+ private static final int LOG_UPATE_INTERVAL = 10;
+ private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage");
/**
* Should provide the name of the test case that this class implements. The exact names are defined in the interop
@@ -129,6 +132,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
+ String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME");
if (debugLog.isDebugEnabled())
{
@@ -150,7 +154,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
session = new Session[1];
connection[0] =
- org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+ org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+ clientName,
+ org.apache.qpid.interop.testclient.TestClient.brokerUrl,
org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[0] = connection[0].createSession(false, ackMode);
@@ -182,6 +188,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
{
connection[i] =
org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+ clientName,
org.apache.qpid.interop.testclient.TestClient.brokerUrl,
org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[i] = connection[i].createSession(false, ackMode);
@@ -192,7 +199,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
MessageConsumer consumer = session[i].createConsumer(sendDestination);
- consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], sendUpdateDestination));
+ consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], sendUpdateDestination));
}
break;
@@ -347,7 +354,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
_received++;
if (((TextMessage) message).getText().equals("start"))
{
- debugLog.info("Starting Batch");
+ debugLog.debug("Starting Batch");
_startTime = System.nanoTime();
}
else if (((TextMessage) message).getText().equals("end"))
@@ -355,8 +362,8 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
if (_startTime != null)
{
long currentTime = System.nanoTime();
- sendStatus(currentTime - _startTime, _received);
- debugLog.info("End Batch");
+ sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH"));
+ debugLog.debug("End Batch");
}
}
}
@@ -373,28 +380,31 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*
* @param time taken for the the last batch
* @param received Total number of messages received.
- *
+ * @param batchNumber the batch number
* @throws JMSException if an error occurs during the send
*/
- private void sendStatus(long time, long received) throws JMSException
+ private void sendStatus(long time, long received, int batchNumber) throws JMSException
{
Message updateMessage = _session.createTextMessage("update");
updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
updateMessage.setLongProperty("RECEIVED", received);
+ updateMessage.setIntProperty("BATCH", batchNumber);
updateMessage.setLongProperty("DURATION", time);
if (debugLog.isInfoEnabled())
{
- debugLog.info("**** SENDING [" + received / _batchSize + "]**** "
- + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ debugLog.info("**** SENDING [" + batchNumber + "]**** "
+ + "CLIENT_ID:" + _client + " RECEIVED:" + received
+ + " BATCH:" + batchNumber + " DURATION:" + time);
}
// Output on the main log.info the details of this batch
- if (received / _batchSize % 10 == 0)
+ if (batchNumber % 10 == 0)
{
- log.info("Sending Report [" + received / _batchSize + "] "
- + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ log.info("Sending Report [" + batchNumber + "] "
+ + "CLIENT_ID:" + _client + " RECEIVED:" + received
+ + " BATCH:" + batchNumber + " DURATION:" + time);
}
_updater.send(updateMessage);
@@ -415,7 +425,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
class SustainedRateAdapter implements MessageListener, Runnable
{
private SustainedTestClient _client;
- private long _messageVariance = 500; //no. messages to allow drifting
+ private long _batchVariance = 3; //no. batches to allow drifting
private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms)
private volatile long _delay; //in nanos
private long _sent;
@@ -451,18 +461,23 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
long duration = message.getLongProperty("DURATION");
long totalReceived = message.getLongProperty("RECEIVED");
String client = message.getStringProperty("CLIENT_ID");
+ int batchNumber = message.getIntProperty("BATCH");
- if (debugLog.isInfoEnabled())
+ if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0)
{
- debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " DURATION:" + duration);
+ debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived
+ + " Recevied BATCH:" + batchNumber + " DURATION:" + duration);
}
- recordSlow(client, totalReceived);
-
- adjustDelay(client, totalReceived, duration);
+ recordSlow(client, totalReceived, batchNumber);
+ adjustDelay(client, batchNumber, duration);
- if (!_warmedup && _totalReceived / _batchSize / delays.size() == _warmUpBatches / 2)
+ // Warm up completes when:
+ // we haven't warmed up
+ // and the number of batches sent to each client is at least half of the required warmup batches
+ if (!_warmedup
+ && (batchNumber >= _warmUpBatches))
{
_warmedup = true;
_warmup.countDown();
@@ -478,7 +493,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
CountDownLatch _warmup = new CountDownLatch(1);
- int _warmUpBatches = 20;
+ int _warmUpBatches = Integer.getInteger("warmUpBatches", 10);
int _numBatches = 10000;
@@ -527,12 +542,14 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
testMessage = _client.session[0].createTextMessage("start");
- for (int batch = 0; batch < batchSize; batch++)
+ for (int batch = 0; batch <= batchSize; batch++)
// while (_running)
{
long start = System.nanoTime();
testMessage.setText("start");
+ testMessage.setIntProperty("BATCH", batch);
+
_client.producer.send(testMessage);
_rateAdapter.sentMessage();
@@ -552,9 +569,12 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
long sendtime = end - start;
- debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]);
+ if (debugLog.isDebugEnabled())
+ {
+ debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]);
+ }
- if (batch % 10 == 0)
+ if (batch % LOG_UPATE_INTERVAL == 0)
{
log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status());
}
@@ -583,23 +603,17 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
return;
}
- //Slow down if gap between send and received is too large
- if (_sent - _totalReceived / delays.size() > _messageVariance)
- {
- //pause between batches.
- debugLog.info("Sleeping to keep sent in check with received");
- log.debug("Increaseing _delay as sending more than receiving");
- _delay += TEN_MILLI_SEC;
- }
-
- //per batch sleep.. if sleep is to small to spread over the batch.
- if (_delay <= TEN_MILLI_SEC * _batchSize)
- {
- sleepLong(_delay);
- }
- else
+ if (!SLEEP_PER_MESSAGE)
{
- debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
+ //per batch sleep.. if sleep is to small to spread over the batch.
+ if (_delay <= TEN_MILLI_SEC * _batchSize)
+ {
+ sleepLong(_delay);
+ }
+ else
+ {
+ debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
+ }
}
}
@@ -617,10 +631,10 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
* Adjust the delay for sending messages based on this update from the client
*
* @param client The client that send this update
- * @param totalReceived The number of messages that this client has received.
* @param duration The time taken for the last batch of messagse
+ * @param batchNumber The reported batchnumber from the client
*/
- private void adjustDelay(String client, long totalReceived, long duration)
+ private void adjustDelay(String client, int batchNumber, long duration)
{
//Retrieve the current total time taken for this client.
Long currentTime = delays.get(client);
@@ -637,23 +651,28 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
delays.put(client, currentTime);
+ long batchesSent = _sent / _batchSize;
+
+ // ensure we don't divide by zero
+ if (batchesSent == 0)
+ {
+ batchesSent = 1L;
+ }
_totalReceived += _batchSize;
_totalDuration += duration;
- // Calculate the number of messages in the batch.
- long batchCount = (_totalReceived / _batchSize);
-
//calculate average duration accross clients per batch
- long averageDuration = _totalDuration / delays.size() / batchCount;
+ long averageDuration = _totalDuration / delays.size() / batchesSent;
//calculate the difference between current send delay and average report delay
long diff = (duration) - averageDuration;
- if (debugLog.isInfoEnabled())
+ if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0)
{
- debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers"
- + " on batch: " + batchCount
+ debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers."
+ + " on batch: " + batchesSent
+ + " received batch: " + batchNumber
+ " Batch Duration: " + duration
+ " Average: " + averageDuration
+ " so diff: " + diff + " for : " + client
@@ -696,6 +715,16 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
delayStable();
}
+ // If we have a consumer that is behind with the batches.
+ if (batchesSent - batchNumber > _batchVariance)
+ {
+ debugLog.debug("Increasing _delay as sending more than receiving");
+
+ _delay += 2 * TEN_MILLI_SEC;
+ delayChanged();
+ }
+
+
}
/** Reset the number of iterations before we say the delay has stabilised. */
@@ -725,10 +754,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*
* @param client The client identifier to check
* @param received the number of messages received by that client
+ * @param batchNumber
*/
- private void recordSlow(String client, long received)
+ private void recordSlow(String client, long received, int batchNumber)
{
- if (received < (_sent - _messageVariance))
+ if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance)
{
_slowClients.put(client, received);
}
@@ -761,6 +791,13 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
}
}
+ else
+ {
+ if (SLEEP_PER_MESSAGE && (_delay > 0))
+ {
+ sleepLong(_delay / _batchSize);
+ }
+ }
}
@@ -771,16 +808,38 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*/
private boolean checkForSlowClients()
{
- if (_sent % _batchSize == 0)
+ // This will allways be true as we are running this at the end of each batchSize
+// if (_sent % _batchSize == 0)
{
// Cause test to pause when we have slow
if (!_slowClients.isEmpty() || NO_CLIENTS)
{
- debugLog.info("Pausing for slow clients:" + _slowClients.entrySet().toArray());
+
while (!_slowClients.isEmpty())
{
- debugLog.info(_slowClients.size() + " slow clients.");
+ if (debugLog.isInfoEnabled()
+ && _sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL == 0)
+ {
+ String clients = "";
+ Iterator it = _slowClients.keySet().iterator();
+ while (it.hasNext())
+ {
+ clients += it.next();
+ if (it.hasNext())
+ {
+ clients += ", ";
+ }
+ }
+ debugLog.info("Pausing for slow clients:" + clients);
+ }
+
+
+ if (log.isDebugEnabled()
+ && _sent / _batchSize % LOG_UPATE_INTERVAL == 0)
+ {
+ log.debug(_slowClients.size() + " slow clients.");
+ }
sleep(PAUSE_SLEEP);
}
@@ -794,7 +853,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
else
{
- debugLog.info("Delay:" + _delay);
+ if (_sent / _batchSize % LOG_UPATE_INTERVAL == 0)
+ {
+ log.info("Total Delay :" + _delay + " "
+ + (_delayShifting == 0 ? "Stablised" : "Not Stablised(" + _delayShifting + ")"));
+ }
}
}
@@ -825,7 +888,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
* Perform the sleep , swallowing any InteruptException.
*
* NOTE: If a sleep request is > 10s then reset only sleep for 5s
- *
+ *
* @param milli to sleep for
* @param nano sub miliseconds to sleep for
*/
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
index b437e165b4..0075e45a8c 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
@@ -113,6 +113,7 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i
setPropertiesOnMessage(assignSender, testProperties);
assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignSender.setStringProperty("ROLE", "SENDER");
+ assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER");
senderConversation.send(senderControlTopic, assignSender);
@@ -170,6 +171,7 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i
setPropertiesOnMessage(assignReceiver, _testProperties);
assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignReceiver.setStringProperty("ROLE", "RECEIVER");
+ assignReceiver.setStringProperty("CLIENT_NAME", "Sustained_RECEIVER_" + receiver.clientName);
receiverConversation.send(receiverControlTopic, assignReceiver);
diff --git a/java/integrationtests/src/resources/sustained-log4j.xml b/java/integrationtests/src/resources/sustained-log4j.xml
new file mode 100644
index 0000000000..c5ab3137bf
--- /dev/null
+++ b/java/integrationtests/src/resources/sustained-log4j.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+
+ <appender name="FileAppender" class="org.apache.log4j.FileAppender">
+ <param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/>
+ <param name="Append" value="false"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p (%F:%L) - %m%n"/>
+ <!--param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/-->
+ </layout>
+ </appender>
+
+ <category name="SustainedTest">
+ <priority value="${sustained.level}"/>
+ </category>
+
+ <category name="org.apache">
+ <priority value="warn"/>
+ </category>
+
+ <category name="org.apache.qpid.interop">
+ <priority value="${interop.logging.level}"/>
+ </category>
+
+
+ <category name="org.apache.qpid.sustained">
+ <priority value="${amqj.logging.level}"/>
+ </category>
+
+ <!--category name="org.apache.qpid.server.txn">
+ <priority value="debug"/>
+ </category>-->
+
+ <root>
+ <priority value="all"/>
+ <appender-ref ref="STDOUT"/>
+ <!--appender-ref ref="ArchivingFileAppender"/-->
+ </root>
+</log4j:configuration>