diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-12-14 14:20:36 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-12-14 14:20:36 +0000 |
commit | 7e0a6783f0d8b732300437514ed5ab5ebe3e84f2 (patch) | |
tree | 1ce2a7024f4e47a0a044b7610d91b2636d9d349b | |
parent | ca87436abf2e423ba95d628c4654ea4308f9a9f9 (diff) | |
download | qpid-python-7e0a6783f0d8b732300437514ed5ab5ebe3e84f2.tar.gz |
QPID-2970: Add dlq proprty to didsable dlq creation in tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1049101 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 44 insertions, 28 deletions
diff --git a/qpid/java/perftests/etc/dlq/config.properties b/qpid/java/perftests/etc/dlq/config.properties index 2c14f20920..d255db5693 100644 --- a/qpid/java/perftests/etc/dlq/config.properties +++ b/qpid/java/perftests/etc/dlq/config.properties @@ -41,6 +41,8 @@ queue = test ## dlq properties ## +# should the queue be created with dead letter queue enabled +dlq = true # maximum times a message will be redelivered before dlq maxRedelivery = 3 # maxRecords must be greater than max(maxPrefetch, count) * threads diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java index aa7b510659..a7ea9cd098 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java @@ -42,16 +42,19 @@ public class Check extends Client public Integer call() throws Exception { start(); - Message msg; - while ((msg = _consumer.receive(1000)) != null) + if (_dlq) { - int number = msg.getIntProperty("number"); - boolean rejectMessage = (number % _reject) == 0; - if (!rejectMessage) - { - throw new RuntimeException("unexpected message on dlq: " + number); - } - _check++; + Message msg; + while ((msg = _consumer.receive(1000)) != null) + { + int number = msg.getIntProperty("number"); + boolean rejectMessage = (number % _reject) == 0; + if (!rejectMessage) + { + throw new RuntimeException("unexpected message on dlq: " + number); + } + _check++; + } } return Integer.valueOf(_check); } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java index 81517b6fbc..e50ec7e5b5 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java @@ -38,11 +38,12 @@ public abstract class Client implements Callable<Integer> protected boolean _clientAck; protected String _queueName; protected int _count; - protected boolean _messageIds; + protected boolean _messageIdsDisabled; protected boolean _persistent; protected int _size; protected int _threads; protected int _maxRecords; + protected boolean _dlq; protected Connection _connection; protected Session _session; @@ -68,9 +69,10 @@ public abstract class Client implements Callable<Integer> _persistent = Boolean.parseBoolean(_props.getProperty(PERSISTENT)); _count = Integer.parseInt(_props.getProperty(COUNT)); _size = Integer.parseInt(_props.getProperty(SIZE)); - _messageIds = !Boolean.parseBoolean(_props.getProperty(MESSAGE_IDS)); + _messageIdsDisabled = !Boolean.parseBoolean(_props.getProperty(MESSAGE_IDS)); _threads = Integer.parseInt(_props.getProperty(THREADS)); _maxRecords = Integer.parseInt(_props.getProperty(MAX_RECORDS)); + _dlq = Boolean.parseBoolean(_props.getProperty(DLQ)); } public void shutdown() diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java index 343c4134a0..58f335d2fc 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java @@ -17,6 +17,7 @@ public interface Config String REJECT = "reject"; String REJECT_COUNT = "rejectCount"; String REPEAT = "repeat"; + String DLQ = "dlq"; String SESSION_TRANSACTED = "SESSION_TRANSACTED"; String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE"; diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java index 310591b85b..83d5d58d6d 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java @@ -41,7 +41,7 @@ public class Create extends Client _queue = new AMQQueue(burl); final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put(AMQQueueFactory.X_QPID_DLQ_ENABLED.asString(), true); + arguments.put(AMQQueueFactory.X_QPID_DLQ_ENABLED.asString(), _dlq); ((AMQSession<?,?>) _session).createQueue(new AMQShortString(_queueName), false, false, false, arguments); ((AMQSession<?,?>) _session).declareAndBind((AMQDestination) new AMQQueue("amq.direct", _queueName)); @@ -50,9 +50,12 @@ public class Create extends Client while (_consumer.receive(1000) != null); _consumer.close(); - _queue = _session.createQueue(_queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); - _consumer = _session.createConsumer(_queue); - while (_consumer.receive(1000) != null); + if (_dlq) + { + _queue = _session.createQueue(_queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + _consumer = _session.createConsumer(_queue); + while (_consumer.receive(1000) != null); + } } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java index 0de3bedded..22f590d893 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java @@ -32,6 +32,7 @@ public class Receiver extends Client private static AtomicInteger _rejectedCount; private static int _consumedCheck; private static int _rejectedCheck; + private static boolean _sessionOk; public Receiver(Properties props) { @@ -57,10 +58,10 @@ public class Receiver extends Client _reject = Integer.parseInt(_props.getProperty(REJECT)); _rejectCount = Integer.parseInt(_props.getProperty(REJECT_COUNT)); - boolean sessionOk = (_transacted || _clientAck) || + _sessionOk = (_transacted || _clientAck) || ((_sessionType == Session.AUTO_ACKNOWLEDGE || _sessionType == Session.DUPS_OK_ACKNOWLEDGE) && _listener); - _rejectedCheck = (!sessionOk || _messageIds || _maxRedelivery == 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject; - _consumedCheck = (_count - _rejectedCheck); // + (sessionOk ? ((_count / _reject) * _rejectCount) : 0); + _rejectedCheck = (!_sessionOk || _messageIdsDisabled || _maxRedelivery == 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject; + _consumedCheck = (_count - _rejectedCheck); // + (_sessionOk ? ((_count / _reject) * _rejectCount) : 0); _consumer = _session.createConsumer(_queue); @@ -111,25 +112,28 @@ public class Receiver extends Client } rejectCount = _rejected.get(number) + 1; _rejected.put(number, rejectCount); - if (rejectCount <= _rejectCount) + if (rejectCount <= _rejectCount && rejectCount <=_maxRedelivery) { - if (rejectCount == _maxRedelivery) + if (_dlq && _sessionOk && rejectCount == _maxRedelivery) { _rejectedCount.incrementAndGet(); _log.debug("client " + _client + " rejecting message (" + rejectCount + ") " + msg.getJMSMessageID()); } - if (rejectCount > _maxRedelivery) - { - throw new RuntimeException("client " + _client + " received message " + msg.getJMSMessageID() + - " " + rejectCount + " times"); - } if (_transacted) { + _log.debug("client " + _client + " rollback of message (" + rejectCount + ") " + msg.getJMSMessageID()); _session.rollback(); } else { - _session.recover(); + if (_sessionOk) + { + _session.recover(); + } + else + { + rejectMessage = false; + } } } else @@ -142,6 +146,7 @@ public class Receiver extends Client { _receivedCount++; _totalConsumedCount.incrementAndGet(); + _log.debug("client " + _client + " consumed message " + _receivedCount + " of " + _totalConsumedCount.get()); if (_transacted) { _session.commit(); diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java index e6c2ff49e2..3316b1ec56 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java @@ -40,7 +40,7 @@ public class Sender extends Client { _producer = _session.createProducer(_queue); _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - _producer.setDisableMessageID(_messageIds); + _producer.setDisableMessageID(_messageIdsDisabled); _connection.start(); } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java index c0091c5fac..bb636bfe74 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java @@ -104,7 +104,7 @@ public class PerformanceTest for (Future<Integer> receive : receives) { _consumed += receive.get(); - } + } Client check = new Check(_props); check.connect(); |