summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-12-14 14:20:36 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-12-14 14:20:36 +0000
commit7e0a6783f0d8b732300437514ed5ab5ebe3e84f2 (patch)
tree1ce2a7024f4e47a0a044b7610d91b2636d9d349b
parentca87436abf2e423ba95d628c4654ea4308f9a9f9 (diff)
downloadqpid-python-7e0a6783f0d8b732300437514ed5ab5ebe3e84f2.tar.gz
QPID-2970: Add dlq proprty to didsable dlq creation in tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1049101 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/perftests/etc/dlq/config.properties2
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java21
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java6
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java1
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java11
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java27
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java2
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java2
8 files changed, 44 insertions, 28 deletions
diff --git a/qpid/java/perftests/etc/dlq/config.properties b/qpid/java/perftests/etc/dlq/config.properties
index 2c14f20920..d255db5693 100644
--- a/qpid/java/perftests/etc/dlq/config.properties
+++ b/qpid/java/perftests/etc/dlq/config.properties
@@ -41,6 +41,8 @@ queue = test
## dlq properties
##
+# should the queue be created with dead letter queue enabled
+dlq = true
# maximum times a message will be redelivered before dlq
maxRedelivery = 3
# maxRecords must be greater than max(maxPrefetch, count) * threads
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java
index aa7b510659..a7ea9cd098 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java
@@ -42,16 +42,19 @@ public class Check extends Client
public Integer call() throws Exception {
start();
- Message msg;
- while ((msg = _consumer.receive(1000)) != null)
+ if (_dlq)
{
- int number = msg.getIntProperty("number");
- boolean rejectMessage = (number % _reject) == 0;
- if (!rejectMessage)
- {
- throw new RuntimeException("unexpected message on dlq: " + number);
- }
- _check++;
+ Message msg;
+ while ((msg = _consumer.receive(1000)) != null)
+ {
+ int number = msg.getIntProperty("number");
+ boolean rejectMessage = (number % _reject) == 0;
+ if (!rejectMessage)
+ {
+ throw new RuntimeException("unexpected message on dlq: " + number);
+ }
+ _check++;
+ }
}
return Integer.valueOf(_check);
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
index 81517b6fbc..e50ec7e5b5 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
@@ -38,11 +38,12 @@ public abstract class Client implements Callable<Integer>
protected boolean _clientAck;
protected String _queueName;
protected int _count;
- protected boolean _messageIds;
+ protected boolean _messageIdsDisabled;
protected boolean _persistent;
protected int _size;
protected int _threads;
protected int _maxRecords;
+ protected boolean _dlq;
protected Connection _connection;
protected Session _session;
@@ -68,9 +69,10 @@ public abstract class Client implements Callable<Integer>
_persistent = Boolean.parseBoolean(_props.getProperty(PERSISTENT));
_count = Integer.parseInt(_props.getProperty(COUNT));
_size = Integer.parseInt(_props.getProperty(SIZE));
- _messageIds = !Boolean.parseBoolean(_props.getProperty(MESSAGE_IDS));
+ _messageIdsDisabled = !Boolean.parseBoolean(_props.getProperty(MESSAGE_IDS));
_threads = Integer.parseInt(_props.getProperty(THREADS));
_maxRecords = Integer.parseInt(_props.getProperty(MAX_RECORDS));
+ _dlq = Boolean.parseBoolean(_props.getProperty(DLQ));
}
public void shutdown()
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java
index 343c4134a0..58f335d2fc 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java
@@ -17,6 +17,7 @@ public interface Config
String REJECT = "reject";
String REJECT_COUNT = "rejectCount";
String REPEAT = "repeat";
+ String DLQ = "dlq";
String SESSION_TRANSACTED = "SESSION_TRANSACTED";
String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java
index 310591b85b..83d5d58d6d 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java
@@ -41,7 +41,7 @@ public class Create extends Client
_queue = new AMQQueue(burl);
final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put(AMQQueueFactory.X_QPID_DLQ_ENABLED.asString(), true);
+ arguments.put(AMQQueueFactory.X_QPID_DLQ_ENABLED.asString(), _dlq);
((AMQSession<?,?>) _session).createQueue(new AMQShortString(_queueName), false, false, false, arguments);
((AMQSession<?,?>) _session).declareAndBind((AMQDestination) new AMQQueue("amq.direct", _queueName));
@@ -50,9 +50,12 @@ public class Create extends Client
while (_consumer.receive(1000) != null);
_consumer.close();
- _queue = _session.createQueue(_queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
- _consumer = _session.createConsumer(_queue);
- while (_consumer.receive(1000) != null);
+ if (_dlq)
+ {
+ _queue = _session.createQueue(_queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ _consumer = _session.createConsumer(_queue);
+ while (_consumer.receive(1000) != null);
+ }
}
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
index 0de3bedded..22f590d893 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
@@ -32,6 +32,7 @@ public class Receiver extends Client
private static AtomicInteger _rejectedCount;
private static int _consumedCheck;
private static int _rejectedCheck;
+ private static boolean _sessionOk;
public Receiver(Properties props)
{
@@ -57,10 +58,10 @@ public class Receiver extends Client
_reject = Integer.parseInt(_props.getProperty(REJECT));
_rejectCount = Integer.parseInt(_props.getProperty(REJECT_COUNT));
- boolean sessionOk = (_transacted || _clientAck) ||
+ _sessionOk = (_transacted || _clientAck) ||
((_sessionType == Session.AUTO_ACKNOWLEDGE || _sessionType == Session.DUPS_OK_ACKNOWLEDGE) && _listener);
- _rejectedCheck = (!sessionOk || _messageIds || _maxRedelivery == 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject;
- _consumedCheck = (_count - _rejectedCheck); // + (sessionOk ? ((_count / _reject) * _rejectCount) : 0);
+ _rejectedCheck = (!_sessionOk || _messageIdsDisabled || _maxRedelivery == 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject;
+ _consumedCheck = (_count - _rejectedCheck); // + (_sessionOk ? ((_count / _reject) * _rejectCount) : 0);
_consumer = _session.createConsumer(_queue);
@@ -111,25 +112,28 @@ public class Receiver extends Client
}
rejectCount = _rejected.get(number) + 1;
_rejected.put(number, rejectCount);
- if (rejectCount <= _rejectCount)
+ if (rejectCount <= _rejectCount && rejectCount <=_maxRedelivery)
{
- if (rejectCount == _maxRedelivery)
+ if (_dlq && _sessionOk && rejectCount == _maxRedelivery)
{
_rejectedCount.incrementAndGet();
_log.debug("client " + _client + " rejecting message (" + rejectCount + ") " + msg.getJMSMessageID());
}
- if (rejectCount > _maxRedelivery)
- {
- throw new RuntimeException("client " + _client + " received message " + msg.getJMSMessageID() +
- " " + rejectCount + " times");
- }
if (_transacted)
{
+ _log.debug("client " + _client + " rollback of message (" + rejectCount + ") " + msg.getJMSMessageID());
_session.rollback();
}
else
{
- _session.recover();
+ if (_sessionOk)
+ {
+ _session.recover();
+ }
+ else
+ {
+ rejectMessage = false;
+ }
}
}
else
@@ -142,6 +146,7 @@ public class Receiver extends Client
{
_receivedCount++;
_totalConsumedCount.incrementAndGet();
+ _log.debug("client " + _client + " consumed message " + _receivedCount + " of " + _totalConsumedCount.get());
if (_transacted)
{
_session.commit();
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java
index e6c2ff49e2..3316b1ec56 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java
@@ -40,7 +40,7 @@ public class Sender extends Client
{
_producer = _session.createProducer(_queue);
_producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- _producer.setDisableMessageID(_messageIds);
+ _producer.setDisableMessageID(_messageIdsDisabled);
_connection.start();
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
index c0091c5fac..bb636bfe74 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
@@ -104,7 +104,7 @@ public class PerformanceTest
for (Future<Integer> receive : receives)
{
_consumed += receive.get();
- }
+ }
Client check = new Check(_props);
check.connect();