summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-28 09:19:15 +0000
committerKeith Wall <kwall@apache.org>2011-11-28 09:19:15 +0000
commit4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd (patch)
tree8f5a5c8e728615f6442f9e317518817f15a3ee74 /java
parent907330f70818a437f7a0723743ab98b355d80d67 (diff)
downloadqpid-python-4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd.tar.gz
QPID-3642,QPID-3643: Add Dead Letter Queue functionality for 0-8/0-9/0-9-1 paths, fixes isBound methods on FanoutExchange
Applied patch from Keith Wall <keith.wall@gmail.com>, Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1207029 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java88
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java47
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java206
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java27
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java95
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java40
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java89
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java62
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java114
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java100
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java16
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java360
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java36
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java17
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java64
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java32
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java11
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java66
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java104
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java54
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java64
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/BindingURL.java9
-rw-r--r--java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java19
-rw-r--r--java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java43
-rw-r--r--java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java2
-rw-r--r--java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java1
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java27
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java695
-rwxr-xr-xjava/test-profiles/CPPExcludes3
51 files changed, 2610 insertions, 172 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 76b6dad996..01a0d9900d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -20,8 +20,8 @@ package org.apache.qpid.server;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
@@ -243,7 +244,13 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
*/
public void createNewQueue(String queueName, String owner, boolean durable) throws JMException, MBeanException
{
- AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
+ createNewQueue(queueName, owner, durable, null);
+ }
+
+ public void createNewQueue(String queueName, String owner, boolean durable, Map<String,Object> arguments) throws JMException
+ {
+ final AMQShortString queueNameAsAMQShortString = new AMQShortString(queueName);
+ AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString);
if (queue != null)
{
throw new JMException("The queue \"" + queueName + "\" already exists.");
@@ -258,11 +265,18 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
ownerShortString = new AMQShortString(owner);
}
+ FieldTable args = null;
+ if(arguments != null)
+ {
+ args = FieldTable.convertToFieldTable(arguments);
+ }
final VirtualHost virtualHost = getVirtualHost();
- queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, false, virtualHost, null);
+
+ queue = AMQQueueFactory.createAMQQueueImpl(queueNameAsAMQShortString, durable, ownerShortString,
+ false, false, getVirtualHost(), args);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _durableConfig.createQueue(queue);
+ _durableConfig.createQueue(queue, args);
}
virtualHost.getBindingFactory().addBinding(queueName, queue, _defaultExchange, null);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 34bc57a826..a4fd997568 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -53,6 +52,7 @@ import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -63,6 +63,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -692,6 +693,31 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
+ public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
+ {
+ final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ if (queueEntry != null)
+ {
+ final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ return maximumDeliveryCount > 0;
+ }
+
+ return false;
+ }
+
+ public boolean isDeliveredTooManyTimes(final long deliveryTag)
+ {
+ final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ if (queueEntry != null)
+ {
+ final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ final int numDeliveries = queueEntry.getDeliveryCount();
+ return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
+ }
+
+ return false;
+ }
+
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
@@ -739,9 +765,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
QueueEntry message = entry.getValue();
long deliveryTag = entry.getKey();
+ //Amend the delivery counter as the client hasn't seen these messages yet.
+ message.decrementDeliveryCount();
-
- ServerMessage msg = message.getMessage();
AMQQueue queue = message.getQueue();
// Our Java Client will always suspend the channel when resending!
@@ -799,6 +825,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
QueueEntry message = entry.getValue();
long deliveryTag = entry.getKey();
+
+ //Amend the delivery counter as the client hasn't seen these messages yet.
+ message.decrementDeliveryCount();
+
_unacknowledgedMessageMap.remove(deliveryTag);
message.setRedelivered();
@@ -1058,6 +1088,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_session.registerMessageDelivered(entry.getMessage().getSize());
getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
deliveryTag, sub.getConsumerTag());
+ entry.incrementDeliveryCount();
}
};
@@ -1246,7 +1277,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
private final Collection<QueueEntry> _ackedMessages;
-
public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
{
_ackedMessages = ackedMessages;
@@ -1479,4 +1509,54 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
}
}
+
+ public void deadLetter(long deliveryTag) throws AMQException
+ {
+ final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
+ final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+
+ if (rejectedQueueEntry == null)
+ {
+ _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
+ return;
+ }
+ else
+ {
+ final ServerMessage msg = rejectedQueueEntry.getMessage();
+
+ final AMQQueue queue = rejectedQueueEntry.getQueue();
+
+ final Exchange altExchange = queue.getAlternateExchange();
+ unackedMap.remove(deliveryTag);
+
+ if (altExchange == null)
+ {
+ _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+ rejectedQueueEntry.discard();
+ return;
+ }
+
+ final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
+
+ final ArrayList<? extends BaseQueue> destinationQueues = altExchange.route(m);
+
+ if (destinationQueues == null || destinationQueues.isEmpty())
+ {
+ _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ rejectedQueueEntry.discard();
+ return;
+ }
+
+ rejectedQueueEntry.routeToAlternate();
+
+ //output operational logging for each delivery post commit
+ for (final BaseQueue destinationQueue : destinationQueues)
+ {
+ _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString()));
+ }
+
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 31c683b548..b8437c8430 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -63,7 +63,9 @@ public class QueueConfiguration extends ConfigurationPlugin
"flowResumeCapacity",
"lvq",
"lvqKey",
- "sortKey"
+ "sortKey",
+ "maximumDeliveryCount",
+ "deadLetterQueues"
};
}
@@ -173,6 +175,19 @@ public class QueueConfiguration extends ConfigurationPlugin
return getStringValue("sortKey", null);
}
+ public int getMaxDeliveryCount()
+ {
+ return getIntValue("maximumDeliveryCount", _vHostConfig.getMaxDeliveryCount());
+ }
+
+ /**
+ * Check if dead letter queue delivery is enabled, deferring to the virtualhost configuration if not set.
+ */
+ public boolean isDeadLetterQueueEnabled()
+ {
+ return getBooleanValue("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled());
+ }
+
public static class QueueConfig extends ConfigurationPlugin
{
@Override
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index e13e2f4d8f..4b42e39aa1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -40,6 +40,8 @@ import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.signal.SignalHandlerTask;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -810,4 +812,34 @@ public class ServerConfiguration extends ConfigurationPlugin
{
return getBooleanValue("management.managementRightsInferAllAccess", true);
}
+
+ public int getMaxDeliveryCount()
+ {
+ return getConfig().getInt("maximumDeliveryCount", 0);
+ }
+
+ /**
+ * Check if dead letter queue delivery is enabled, defaults to disabled if not set.
+ */
+ public boolean isDeadLetterQueueEnabled()
+ {
+ return getConfig().getBoolean("deadLetterQueues", false);
+ }
+
+ /**
+ * String to affix to end of queue name when generating an alternate exchange for DLQ purposes.
+ */
+ public String getDeadLetterExchangeSuffix()
+ {
+ return getConfig().getString("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ }
+
+ /**
+ * String to affix to end of queue name when generating a queue for DLQ purposes.
+ */
+ public String getDeadLetterQueueSuffix()
+ {
+ return getConfig().getString("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 6729a5ce0f..c4e4f701a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -347,4 +347,18 @@ public class VirtualHostConfiguration extends ConfigurationPlugin
{
return getLongValue("transactionTimeout.idleClose", 0L);
}
+
+ public int getMaxDeliveryCount()
+ {
+ return getIntValue("queues.maximumDeliveryCount", ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount());
+ }
+
+ /**
+ * Check if dead letter queue delivery is enabled, deferring to the broker configuration if not set.
+ */
+ public boolean isDeadLetterQueueEnabled()
+ {
+ return getBooleanValue("queues.deadLetterQueues", ApplicationRegistry.getInstance().getConfiguration().isDeadLetterQueueEnabled());
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 7837a9bc38..102cd42ac3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchangeFactory implements ExchangeFactory
{
private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
+ public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
private Map<AMQShortString, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends Exchange>>();
private final VirtualHost _host;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index bd75f7bc51..76f86ea1b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -117,7 +117,7 @@ public class FanoutExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- return _queues.contains(queue);
+ return _queues.containsKey(queue);
}
public boolean isBound(AMQShortString routingKey)
@@ -129,7 +129,7 @@ public class FanoutExchange extends AbstractExchange
public boolean isBound(AMQQueue queue)
{
- return _queues.contains(queue);
+ return _queues.containsKey(queue);
}
public boolean hasBindings()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index ef9711004d..bbb009003c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -127,8 +127,6 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
{
- int _msg;
-
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
@@ -137,6 +135,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
deliveryTag, queue.getMessageCount());
+ entry.incrementDeliveryCount();
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index 62dd76f832..0ea88e4ab6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -59,7 +59,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
_logger.debug("Rejecting:" + body.getDeliveryTag() +
": Requeue:" + body.getRequeue() +
- //": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
}
@@ -70,26 +69,23 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
if (message == null)
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
-// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
}
else
{
if (message.isQueueDeleted())
{
- _logger.warn("Message's Queue as already been purged, unable to Reject. " +
- "Dropping message should use Dead Letter Queue");
+ _logger.warn("Message's Queue has already been purged, dropping message");
message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
if(message != null)
{
message.discard();
}
- //sendtoDeadLetterQueue(msg)
return;
}
if (message.getMessage() == null)
{
- _logger.warn("Message as already been purged, unable to Reject.");
+ _logger.warn("Message has already been purged, unable to Reject.");
return;
}
@@ -98,27 +94,44 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
_logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
": Requeue:" + body.getRequeue() +
- //": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
}
- // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
- //if (!evt.getMethod().resend)
- {
- message.reject();
- }
+ message.reject();
if (body.getRequeue())
{
channel.requeue(deliveryTag);
+
+ //this requeue represents a message rejected from the pre-dispatch queue
+ //therefore we need to amend the delivery counter.
+ message.decrementDeliveryCount();
}
else
{
- _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- //sendtoDeadLetterQueue(AMQMessage message)
-// message.queue = channel.getDefaultDeadLetterQueue();
-// channel.requeue(deliveryTag);
+ final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
+ else
+ {
+ //this requeue represents a message rejected because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+ //and therefore need to increment the delivery counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 4643dee0a3..20ba3af458 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -72,7 +72,12 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
};
channel.rollback(task);
-
+
+ //Now resend all the unacknowledged messages back to the original subscribers.
+ //(Must be done after the TxnRollback-ok response).
+ // Why, are we not allowed to send messages back to client before the ok method?
+ channel.resend(false);
+
}
catch (AMQException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
index ed8c0d0ce9..b5df212904 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
+++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
@@ -32,3 +32,7 @@ FLOW_REMOVED = CHN-1006 : Flow Control Removed
# 0 - time in milliseconds
OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms
IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms
+
+DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}
+DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}
+DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index db32f13d8d..32bf8aa17d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.message;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
@@ -65,7 +66,6 @@ public class AMQMessage extends AbstractServerMessageImpl
WeakReference<AMQChannel> _channelRef;
-
public AMQMessage(StoredMessage<MessageMetaData> handle)
{
this(handle, null);
@@ -122,7 +122,15 @@ public class AMQMessage extends AbstractServerMessageImpl
public String getRoutingKey()
{
- // TODO
+ MessageMetaData messageMetaData = getMessageMetaData();
+ if (messageMetaData != null)
+ {
+ AMQShortString routingKey = messageMetaData.getMessagePublishInfo().getRoutingKey();
+ if (routingKey != null)
+ {
+ return routingKey.asString();
+ }
+ }
return null;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 9140a13625..6dfdc5e8b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -213,6 +213,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
void setAlternateExchange(Exchange exchange);
+ void setAlternateExchange(String exchangeName);
+
Map<String, Object> getArguments();
void checkCapacity(AMQChannel channel);
@@ -272,4 +274,22 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
ManagedObject getManagedObject();
void setExclusive(boolean exclusive) throws AMQException;
+
+ /**
+ * Gets the maximum delivery count. If a message on this queue
+ * is delivered more than maximumDeliveryCount, the message will be
+ * routed to the {@link #getAlternateExchange()} (if set), or otherwise
+ * discarded. 0 indicates that maximum deliver count should not be enforced.
+ *
+ * @return maximum delivery count
+ */
+ int getMaximumDeliveryCount();
+
+ /**
+ * Sets the maximum delivery count.
+ *
+ * @param maximumDeliveryCount maximum delivery count
+ */
+ public void setMaximumDeliveryCount(final int maximumDeliveryCount);
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index e4a6f01930..1b15bafb49 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -24,9 +24,15 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
@@ -37,6 +43,11 @@ public class AMQQueueFactory
public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+ public static final String DLQ_ROUTING_KEY = "dlq";
+ public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+ public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
+ public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
+
private abstract static class QueueProperty
{
@@ -80,6 +91,24 @@ public class AMQQueueFactory
}
+ private abstract static class QueueIntegerProperty extends QueueProperty
+ {
+ public QueueIntegerProperty(String argumentName)
+ {
+ super(argumentName);
+ }
+
+ public void setPropertyValue(AMQQueue queue, Object value)
+ {
+ if(value instanceof Number)
+ {
+ setPropertyValue(queue, ((Number)value).intValue());
+ }
+
+ }
+ abstract void setPropertyValue(AMQQueue queue, int value);
+ }
+
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
new QueueLongProperty("x-qpid-maximum-message-age")
{
@@ -122,8 +151,14 @@ public class AMQQueueFactory
{
queue.setFlowResumeCapacity(value);
}
+ },
+ new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT)
+ {
+ public void setPropertyValue(AMQQueue queue, int value)
+ {
+ queue.setMaximumDeliveryCount(value);
+ }
}
-
};
@@ -149,8 +184,13 @@ public class AMQQueueFactory
String owner,
boolean autoDelete,
boolean exclusive,
- VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException
+ VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
{
+ if (queueName == null)
+ {
+ throw new IllegalArgumentException("Queue name must not be null");
+ }
+
// Access check
if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner))
{
@@ -158,6 +198,13 @@ public class AMQQueueFactory
throw new AMQSecurityException(description);
}
+ QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName);
+ boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration);
+ if (isDLQEnabled)
+ {
+ validateDLNames(queueName);
+ }
+
int priorities = 1;
String conflationKey = null;
String sortingKey = null;
@@ -219,10 +266,63 @@ public class AMQQueueFactory
}
}
- return q;
+ if(isDLQEnabled)
+ {
+ final String dlExchangeName = getDeadLetterExchangeName(queueName);
+ final String dlQueueName = getDeadLetterQueueName(queueName);
- }
+ final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+ final QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ Exchange dlExchange = null;
+ synchronized(exchangeRegistry)
+ {
+ dlExchange = exchangeRegistry.getExchange(dlExchangeName);
+
+ if(dlExchange == null)
+ {
+ dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+
+ exchangeRegistry.registerExchange(dlExchange);
+
+ //enter the dle in the persistent store
+ virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
+ }
+ }
+
+ AMQQueue dlQueue = null;
+
+ synchronized(queueRegistry)
+ {
+ dlQueue = queueRegistry.getQueue(dlQueueName);
+
+ if(dlQueue == null)
+ {
+ //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc
+ final Map<String, Object> args = new HashMap<String, Object>();
+ args.put(X_QPID_DLQ_ENABLED, false);
+ args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
+
+ dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args);
+
+ //enter the dlq in the persistent store
+ virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+ }
+ }
+
+ //ensure the queue is bound to the exchange
+ if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
+ {
+ //actual routing key used does not matter due to use of fanout exchange,
+ //but we will make the key 'dlq' as it can be logged at creation.
+ virtualHost.getBindingFactory().addBinding(DLQ_ROUTING_KEY, dlQueue, dlExchange, null);
+ }
+ q.setAlternateExchange(dlExchange);
+ }
+ return q;
+ }
public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
{
@@ -250,10 +350,108 @@ public class AMQQueueFactory
arguments = new HashMap<String,Object>();
arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
}
+ if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
+ {
+ if (arguments == null)
+ {
+ arguments = new HashMap<String,Object>();
+ }
+ arguments.put(X_QPID_DLQ_ENABLED, true);
+ }
AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments);
q.configure(config);
return q;
}
+
+ /**
+ * Validates DLQ and DLE names
+ * <p>
+ * DLQ name and DLQ exchange name need to be validated in order to keep
+ * integrity in cases when queue name passes validation check but DLQ name
+ * or DL exchange name fails to pass it. Otherwise, we might have situations
+ * when queue is created but DL exchange or/and DLQ creation fail.
+ * <p>
+ *
+ * @param name
+ * queue name
+ * @throws IllegalArgumentException
+ * thrown if length of queue name or exchange name exceed 255
+ */
+ protected static void validateDLNames(String name)
+ {
+ // check if DLQ name and DLQ exchange name do not exceed 255
+ String exchangeName = getDeadLetterExchangeName(name);
+ if (exchangeName.length() > AMQShortString.MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DL exchange name '" + exchangeName
+ + "' length exceeds limit of " + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+ }
+ String queueName = getDeadLetterQueueName(name);
+ if (queueName.length() > AMQShortString.MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of "
+ + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+ }
+ }
+
+ /**
+ * Checks if DLQ is enabled for the queue.
+ *
+ * @param autoDelete
+ * queue auto-delete flag
+ * @param arguments
+ * queue arguments
+ * @param qConfig
+ * queue configuration
+ * @return true if DLQ enabled
+ */
+ protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
+ {
+ //feature is not to be enabled for temporary queues or when explicitly disabled by argument
+ if (!autoDelete)
+ {
+ boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED);
+ if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled())
+ {
+ boolean dlqEnabled = true;
+ if (dlqArgumentPresent)
+ {
+ Object argument = arguments.get(X_QPID_DLQ_ENABLED);
+ dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue();
+ }
+ return dlqEnabled;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Generates a dead letter queue name for a given queue name
+ *
+ * @param name
+ * queue name
+ * @return DLQ name
+ */
+ protected static String getDeadLetterQueueName(String name)
+ {
+ ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+ String dlQueueName = name + serverConfig.getDeadLetterQueueSuffix();
+ return dlQueueName;
+ }
+
+ /**
+ * Generates a dead letter exchange name for a given queue name
+ *
+ * @param name
+ * queue name
+ * @return DL exchange name
+ */
+ protected static String getDeadLetterExchangeName(String name)
+ {
+ ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+ String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix();
+ return dlExchangeName;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index d58d95c801..b4765d6227 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
@@ -80,7 +81,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private final String _queueName;
// OpenMBean data types for viewMessages method
- private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types.
+ private static OpenType[] _msgAttributeTypes = new OpenType[6]; // AMQ message attribute types.
private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
@@ -139,6 +140,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
_msgAttributeTypes[2] = SimpleType.LONG; // For size
_msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
_msgAttributeTypes[4] = SimpleType.LONG; // For queue position
+ _msgAttributeTypes[5] = SimpleType.INTEGER; // For delivery count
_messageDataType = new CompositeType("Message", "AMQ Message",
VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]),
@@ -177,6 +179,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
return _queue.getMessageCount();
}
+ public Integer getMaximumDeliveryCount()
+ {
+ return _queue.getMaximumDeliveryCount();
+ }
+
public Long getMaximumMessageSize()
{
return _queue.getMaximumMessageSize();
@@ -295,6 +302,18 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
}
}
+ public void setAlternateExchange(String exchangeName)
+ {
+ _queue.setAlternateExchange(exchangeName);
+ }
+
+ public String getAlternateExchange()
+ {
+ Exchange exchange = _queue.getAlternateExchange();
+ String name = exchange == null ? null : exchange.getName();
+ return name == null ? null : name;
+ }
+
/**
* Checks if there is any notification to be send to the listeners
*/
@@ -472,7 +491,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
headerAttributes = getMessageHeaderProperties(headerBody);
- itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position};
+ itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
}
else if(serverMsg instanceof MessageTransferMessage)
{
@@ -481,13 +500,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
// Create header attributes list
headerAttributes = getMessageTransferMessageHeaderProps(msg);
- itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position};
+ itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
}
else
{
//unknown message
headerAttributes = new String[]{"N/A"};
- itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position};
+ itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
}
CompositeData messageData = new CompositeDataSupport(_messageDataType,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
index 77da08d8c4..26112d9f53 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
@@ -24,7 +24,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
-class InboundMessageAdapter implements InboundMessage
+public class InboundMessageAdapter implements InboundMessage
{
private QueueEntry _entry;
@@ -33,7 +33,7 @@ class InboundMessageAdapter implements InboundMessage
{
}
- InboundMessageAdapter(QueueEntry entry)
+ public InboundMessageAdapter(QueueEntry entry)
{
_entry = entry;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index c1fb0258fa..37fad54c07 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -1,5 +1,7 @@
package org.apache.qpid.server.queue;
+import java.util.Collection;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.message.ServerMessage;
@@ -234,4 +236,16 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
* @return true if entry is either DEQUED or DELETED state
*/
boolean isDispensed();
+
+ /**
+ * Number of times this queue entry has been delivered.
+ *
+ * @return delivery count
+ */
+ int getDeliveryCount();
+
+ void incrementDeliveryCount();
+
+ void decrementDeliveryCount();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ee1d214c1f..5bb5dc3462 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -31,10 +31,13 @@ import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -80,6 +83,12 @@ public abstract class QueueEntryImpl implements QueueEntry
private volatile int _deliveryState;
+ /** Number of times this message has been delivered */
+ private volatile int _deliveryCount = 0;
+ private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater
+ .newUpdater(QueueEntryImpl.class, "_deliveryCount");
+
+
public QueueEntryImpl(QueueEntryList<?> queueEntryList)
{
@@ -406,50 +415,51 @@ public abstract class QueueEntryImpl implements QueueEntry
public void routeToAlternate()
{
final AMQQueue currentQueue = getQueue();
- Exchange alternateExchange = currentQueue.getAlternateExchange();
+ Exchange alternateExchange = currentQueue.getAlternateExchange();
- if(alternateExchange != null)
+ if (alternateExchange != null)
+ {
+ final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+ final ServerMessage message = getMessage();
+ if (rerouteQueues != null && rerouteQueues.size() != 0)
{
- final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
- final ServerMessage message = getMessage();
- if(rerouteQueues != null && rerouteQueues.size() != 0)
- {
- ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
- txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() {
- public void postCommit()
+ ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+
+ txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ try
{
- try
+ for (BaseQueue queue : rerouteQueues)
{
- for(BaseQueue queue : rerouteQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
+ queue.enqueue(message);
}
}
-
- public void onRollback()
+ catch (AMQException e)
{
-
+ throw new RuntimeException(e);
}
- });
- txn.dequeue(currentQueue,message,
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- discard();
- }
-
- public void onRollback()
- {
-
- }
- });
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ txn.dequeue(currentQueue, message, new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
}
}
}
@@ -524,4 +534,19 @@ public abstract class QueueEntryImpl implements QueueEntry
return _state.isDispensed();
}
+ public int getDeliveryCount()
+ {
+ return _deliveryCount;
+ }
+
+ public void incrementDeliveryCount()
+ {
+ _deliveryCountUpdater.incrementAndGet(this);
+ }
+
+ public void decrementDeliveryCount()
+ {
+ _deliveryCountUpdater.decrementAndGet(this);
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index ab47d89e01..7717c8ebfc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -187,7 +187,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private long _createTime = System.currentTimeMillis();
private ConfigurationPlugin _queueConfiguration;
-
+ /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
+ private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
{
@@ -356,6 +357,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_alternateExchange = exchange;
}
+ public void setAlternateExchange(String exchangeName)
+ {
+ if(exchangeName == null || exchangeName.equals(""))
+ {
+ _alternateExchange = null;
+ return;
+ }
+
+ Exchange exchange = getVirtualHost().getExchangeRegistry().getExchange(new AMQShortString(exchangeName));
+ if (exchange == null)
+ {
+ throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost.");
+ }
+ setAlternateExchange(exchange);
+ }
+
public Map<String, Object> getArguments()
{
return _arguments;
@@ -521,13 +538,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//Reconfigure the queue for to reflect this new binding.
ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration);
- }
-
if (config != null)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration);
+ }
// Reconfigure with new config.
configure(config);
}
@@ -2108,6 +2124,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize());
setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount());
setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap());
+ setMaximumDeliveryCount(((QueueConfiguration)config).getMaxDeliveryCount());
_capacity = ((QueueConfiguration)config).getCapacity();
_flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity();
}
@@ -2229,4 +2246,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _logActor;
}
+
+ public int getMaximumDeliveryCount()
+ {
+ return _maximumDeliveryCount;
+ }
+
+ public void setMaximumDeliveryCount(final int maximumDeliveryCount)
+ {
+ _maximumDeliveryCount = maximumDeliveryCount;
+ }
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
index fdd533b704..7d128f2bc5 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -20,18 +20,31 @@
*/
package org.apache.qpid.server;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase
+public class AMQBrokerManagerMBeanTest extends QpidTestCase
{
private QueueRegistry _queueRegistry;
private ExchangeRegistry _exchangeRegistry;
@@ -95,14 +108,86 @@ public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase
assertTrue("New queue should be bound to default exchange", defaultExchange.isBound(new AMQShortString(queueName)));
}
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument does cause the
+ * maximum delivery count to be set on the Queue.
+ */
+ public void testCreateNewQueueWithMaximumDeliveryCount() throws Exception
+ {
+ final Map<String,Object> args = new HashMap<String, Object>();
+ args.put(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5);
+
+ final AMQShortString queueName = new AMQShortString("testCreateNewQueueWithMaximumDeliveryCount");
+
+ final QueueRegistry qReg = _vHost.getQueueRegistry();
+
+ assertNull("The queue should not yet exist", qReg.getQueue(queueName));
+
+ final ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
+ mbean.createNewQueue(queueName.asString(), "test", false, args);
+
+ final AMQQueue createdQueue = qReg.getQueue(queueName);
+ assertNotNull("The queue was not registered as expected", createdQueue);
+ assertEquals("Unexpected maximum delivery count", 5, createdQueue.getMaximumDeliveryCount());
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_PRIORITIES} argument prompts creation of
+ * a Priority Queue.
+ */
+ public void testCreatePriorityQueue() throws Exception
+ {
+ int numPriorities = 7;
+ Map<String,Object> args = new HashMap<String, Object>();
+ args.put(AMQQueueFactory.X_QPID_PRIORITIES, numPriorities);
+
+ AMQShortString queueName = new AMQShortString("testCreatePriorityQueue");
+
+ QueueRegistry qReg = _vHost.getQueueRegistry();
+
+ assertNull("The queue should not yet exist", qReg.getQueue(queueName));
+
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
+ mbean.createNewQueue(queueName.asString(), "test", false, args);
+
+ AMQQueue queue = qReg.getQueue(queueName);
+ assertEquals("Queue is not a priorty queue", AMQPriorityQueue.class, queue.getClass());
+ assertEquals("Number of priorities supported was not as expected", numPriorities, ((AMQPriorityQueue)queue).getPriorities());
+ }
+
@Override
public void setUp() throws Exception
{
super.setUp();
+
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+
+ XMLConfiguration configXml = new XMLConfiguration();
+ configXml.addProperty("virtualhosts.virtualhost(-1).name", "test");
+ configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
+
+ ServerConfiguration configuration = new ServerConfiguration(configXml);
+
+ ApplicationRegistry registry = new TestApplicationRegistry(configuration);
+ ApplicationRegistry.initialise(registry);
+ registry.getVirtualHostRegistry().setDefaultVirtualHostName("test");
+
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
_vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
_queueRegistry = _vHost.getQueueRegistry();
_exchangeRegistry = _vHost.getExchangeRegistry();
}
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
index 9941c00499..e1a5e7d338 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
@@ -24,6 +24,8 @@ import junit.framework.TestCase;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
public class QueueConfigurationTest extends TestCase
{
@@ -43,11 +45,71 @@ public class QueueConfigurationTest extends TestCase
fullEnv.setProperty("queues.maximumMessageSize", 1);
fullEnv.setProperty("queues.maximumMessageCount", 1);
fullEnv.setProperty("queues.minimumAlertRepeatGap", 1);
+ fullEnv.setProperty("queues.deadLetterQueues", true);
+ fullEnv.setProperty("queues.maximumDeliveryCount", 5);
_fullHostConf = new VirtualHostConfiguration("test", fullEnv);
}
+ public void testMaxDeliveryCount() throws Exception
+ {
+ try
+ {
+ ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env));
+ ApplicationRegistry.initialise(registry);
+
+ // Check default value
+ QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+ assertEquals("Unexpected default server configuration for max delivery count ", 0, qConf.getMaxDeliveryCount());
+
+ // Check explicit value
+ VirtualHostConfiguration vhostConfig = overrideConfiguration("maximumDeliveryCount", 7);
+ qConf = new QueueConfiguration("test", vhostConfig);
+ assertEquals("Unexpected host configuration for max delivery count", 7, qConf.getMaxDeliveryCount());
+
+ // Check inherited value
+ qConf = new QueueConfiguration("test", _fullHostConf);
+ assertEquals("Unexpected queue configuration for max delivery count", 5, qConf.getMaxDeliveryCount());
+
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
+ /**
+ * Tests that the default setting for DLQ configuration is disabled, and verifies that it can be overridden
+ * at a broker or virtualhost level.
+ * @throws Exception
+ */
+ public void testIsDeadLetterQueueEnabled() throws Exception
+ {
+ try
+ {
+ ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env));
+ ApplicationRegistry.initialise(registry);
+
+ // Check default value
+ QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+ assertFalse("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+
+ // Check explicit value
+ VirtualHostConfiguration vhostConfig = overrideConfiguration("deadLetterQueues", true);
+ qConf = new QueueConfiguration("test", vhostConfig);
+ assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+
+ // Check inherited value
+ qConf = new QueueConfiguration("test", _fullHostConf);
+ assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
public void testGetMaximumMessageAge() throws ConfigurationException
{
// Check default value
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index 9ee2ed3812..7739f9976e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -25,6 +25,7 @@ import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.Writer;
import java.util.Locale;
import org.apache.commons.configuration.ConfigurationException;
@@ -1459,4 +1460,117 @@ public class ServerConfigurationTest extends QpidTestCase
ce.getMessage());
}
}
+
+ public void testMaxDeliveryCountDefault() throws Exception
+ {
+ final ServerConfiguration serverConfig = new ServerConfiguration(_config);
+ assertEquals(0, serverConfig.getMaxDeliveryCount());
+ }
+
+ public void testMaxDeliveryCount() throws Exception
+ {
+ _config.setProperty("maximumDeliveryCount", 5);
+ final ServerConfiguration serverConfig = new ServerConfiguration(_config);
+ assertEquals(5, serverConfig.getMaxDeliveryCount());
+ }
+
+ /**
+ * Test XML configuration file correctly enables dead letter queues
+ */
+ public void testDeadLetterQueueConfigurationFile() throws Exception
+ {
+ // Write config
+ File xml = File.createTempFile(getClass().getName(), "xml");
+ xml.deleteOnExit();
+ FileWriter config = new FileWriter(xml);
+ config.write("<broker>\n");
+ writeSecurity(config);
+ config.write("<deadLetterQueues>true</deadLetterQueues>\n");
+ config.write("<virtualhosts>\n");
+ config.write("<virtualhost>\n");
+ config.write("<name>test</name>\n");
+ config.write("<test>\n");
+ config.write("<queues>\n");
+ config.write("<deadLetterQueues>false</deadLetterQueues>\n");
+ config.write("<queue>\n");
+ config.write("<name>biggles</name>\n");
+ config.write("<biggles>\n");
+ config.write("<deadLetterQueues>true</deadLetterQueues>\n");
+ config.write("</biggles>\n");
+ config.write("</queue>\n");
+ config.write("<queue>\n");
+ config.write("<name>beetle</name>\n");
+ config.write("<beetle />\n");
+ config.write("</queue>\n");
+ config.write("</queues>\n");
+ config.write("</test>\n");
+ config.write("</virtualhost>\n");
+ config.write("<virtualhost>\n");
+ config.write("<name>extra</name>\n");
+ config.write("<extra>\n");
+ config.write("<queues>\n");
+ config.write("<queue>\n");
+ config.write("<name>r2d2</name>\n");
+ config.write("<r2d2>\n");
+ config.write("<deadLetterQueues>false</deadLetterQueues>\n");
+ config.write("</r2d2>\n");
+ config.write("</queue>\n");
+ config.write("<queue>\n");
+ config.write("<name>c3p0</name>\n");
+ config.write("<c3p0 />\n");
+ config.write("</queue>\n");
+ config.write("</queues>\n");
+ config.write("</extra>\n");
+ config.write("</virtualhost>\n");
+ config.write("</virtualhosts>\n");
+ config.write("</broker>\n");
+ config.close();
+
+ // Load config
+ ApplicationRegistry.remove();
+ ApplicationRegistry registry = new ConfigurationFileApplicationRegistry(xml);
+ ApplicationRegistry.initialise(registry);
+ ServerConfiguration serverConfiguration = ApplicationRegistry.getInstance().getConfiguration();
+
+ VirtualHostConfiguration test = serverConfiguration.getVirtualHostConfig("test");
+ assertNotNull("Host 'test' is not found", test);
+ VirtualHostConfiguration extra = serverConfiguration.getVirtualHostConfig("extra");
+ assertNotNull("Host 'extra' is not found", test);
+
+ QueueConfiguration biggles = test.getQueueConfiguration("biggles");
+ QueueConfiguration beetle = test.getQueueConfiguration("beetle");
+ QueueConfiguration r2d2 = extra.getQueueConfiguration("r2d2");
+ QueueConfiguration c3p0 = extra.getQueueConfiguration("c3p0");
+
+ // Validate config
+ assertTrue("Broker DLQ should be configured as enabled", serverConfiguration.isDeadLetterQueueEnabled());
+ assertFalse("Test vhost DLQ should be configured as disabled", test.isDeadLetterQueueEnabled());
+ assertTrue("Extra vhost DLQ should be enabled, using broker default", extra.isDeadLetterQueueEnabled());
+ assertTrue("Biggles queue DLQ should be configured as enabled", biggles.isDeadLetterQueueEnabled());
+ assertFalse("Beetle queue DLQ should be disabled, using test vhost default", beetle.isDeadLetterQueueEnabled());
+ assertFalse("R2D2 queue DLQ should be configured as disabled", r2d2.isDeadLetterQueueEnabled());
+ assertTrue("C3P0 queue DLQ should be enabled, using broker default", c3p0.isDeadLetterQueueEnabled());
+ }
+
+ /**
+ * Convenience method to output required security preamble for broker config
+ */
+ private void writeSecurity(Writer out) throws Exception
+ {
+ out.write("\t<management><enabled>false</enabled></management>\n");
+ out.write("\t<security>\n");
+ out.write("\t\t<pd-auth-manager>\n");
+ out.write("\t\t\t<principal-database>\n");
+ out.write("\t\t\t\t<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>\n");
+ out.write("\t\t\t\t<attributes>\n");
+ out.write("\t\t\t\t\t<attribute>\n");
+ out.write("\t\t\t\t\t\t<name>passwordFile</name>\n");
+ out.write("\t\t\t\t\t\t<value>/dev/null</value>\n");
+ out.write("\t\t\t\t\t</attribute>\n");
+ out.write("\t\t\t\t</attributes>\n");
+ out.write("\t\t\t</principal-database>\n");
+ out.write("\t\t\t<jmx-access>/dev/null</jmx-access>\n");
+ out.write("\t\t</pd-auth-manager>\n");
+ out.write("\t</security>\n");
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index b133d53ac5..f6cd397217 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -21,7 +21,6 @@ package org.apache.qpid.server.configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
@@ -34,19 +33,6 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
{
@Override
- public void setUp() throws Exception
- {
- super.setUp();
- // Set the default configuration items
- getConfigXml().clear();
- getConfigXml().addProperty("virtualhosts.virtualhost(-1).name", "test");
- getConfigXml().addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
-
- getConfigXml().addProperty("virtualhosts.virtualhost.name", getName());
- getConfigXml().addProperty("virtualhosts.virtualhost."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
- }
-
- @Override
public void createBroker()
{
// Prevent auto broker startup
@@ -134,6 +120,88 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
assertEquals(3, bTest.getMaximumMessageAge());
}
+ public void testMaxDeliveryCount() throws Exception
+ {
+ // Set up vhosts and queues
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.maximumDeliveryCount", 5);
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.maximumDeliveryCount", 4);
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle");
+
+ // Start the broker now.
+ super.createBroker();
+
+ // Get vhosts
+ VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName());
+
+ // Enabled specifically
+ assertEquals("Test vhost MDC was configured as enabled", 5 ,test.getConfiguration().getMaxDeliveryCount());
+
+ // Enabled by test vhost default
+ assertEquals("beetle queue DLQ was configured as enabled", test.getConfiguration().getMaxDeliveryCount(), test.getConfiguration().getQueueConfiguration("beetle").getMaxDeliveryCount());
+
+ // Disabled specifically
+ assertEquals("Biggles queue DLQ was configured as disabled", 4, test.getConfiguration().getQueueConfiguration("biggles").getMaxDeliveryCount());
+ }
+
+ /**
+ * Tests the full set of configuration options for enabling DLQs in the broker configuration.
+ */
+ public void testIsDeadLetterQueueEnabled() throws Exception
+ {
+ // Set up vhosts and queues
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.deadLetterQueues", "true");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.deadLetterQueues", "false");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle");
+
+
+ getConfigXml().addProperty("virtualhosts.virtualhost.name", getName() + "Extra");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName());
+
+ // Start the broker now.
+ super.createBroker();
+
+ // Get vhosts
+ VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName());
+ VirtualHost extra = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName() + "Extra");
+
+ // Enabled specifically
+ assertTrue("Test vhost DLQ was configured as enabled", test.getConfiguration().isDeadLetterQueueEnabled());
+ assertTrue("r2d2 queue DLQ was configured as enabled", extra.getConfiguration().getQueueConfiguration("r2d2").isDeadLetterQueueEnabled());
+
+ // Enabled by test vhost default
+ assertTrue("beetle queue DLQ was configured as enabled", test.getConfiguration().getQueueConfiguration("beetle").isDeadLetterQueueEnabled());
+
+ // Disabled specifically
+ assertFalse("Biggles queue DLQ was configured as disabled", test.getConfiguration().getQueueConfiguration("biggles").isDeadLetterQueueEnabled());
+
+ // Using broker default of disabled
+ assertFalse("Extra vhost DLQ disabled, using broker default", extra.getConfiguration().isDeadLetterQueueEnabled());
+ assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled());
+
+ // Get queues
+ AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles"));
+ AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle"));
+ AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2"));
+ AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0"));
+
+ // Disabled specifically for this queue, overriding virtualhost setting
+ assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange());
+
+ // Enabled for all queues on the virtualhost
+ assertNotNull("Beetle queue should have an alt exchange as DLQ should be enabled, using test vhost default", beetle.getAlternateExchange());
+
+ // Enabled specifically for this queue, overriding the default broker setting of disabled
+ assertNotNull("R2D2 queue should have an alt exchange as DLQ should be configured as enabled", r2d2.getAlternateExchange());
+
+ // Disabled by the default broker setting
+ assertNull("C3PO queue should not have an alt exchange as DLQ should be disabled, using broker default", c3p0.getAlternateExchange());
+ }
+
/**
* Test that the house keeping pool sizes is correctly processed
*
@@ -173,7 +241,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
vhost.getHouseKeepingTaskCount());
// Currently the two are tasks:
- // ExpiredMessageTask from VirtualHost
+ // ExpiredMessageTask from VirtualHost
// UpdateTask from the QMF ManagementExchange
}
@@ -214,7 +282,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
{
getConfigXml().addProperty("virtualhosts.virtualhost.testSecurityAuthenticationNameRejected.security.authentication.name",
"testdb");
-
+
try
{
super.createBroker();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 7b73987abf..ea2fe90da6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -490,6 +490,22 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
{
return null;
}
+
+ @Override
+ public int getDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementDeliveryCount()
+ {
+ }
+
+ @Override
+ public void decrementDeliveryCount()
+ {
+ }
};
if(action != null)
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 27891289fb..2b7d1d7f26 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -20,39 +20,76 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class AMQQueueFactoryTest extends InternalBrokerBaseCase
+public class AMQQueueFactoryTest extends QpidTestCase
{
QueueRegistry _queueRegistry;
VirtualHost _virtualHost;
- int _defaultQueueCount;
@Override
public void setUp() throws Exception
{
super.setUp();
- ApplicationRegistry registry = (ApplicationRegistry) ApplicationRegistry.getInstance();
- _virtualHost = registry.getVirtualHostRegistry().getVirtualHost("test");
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+
+ XMLConfiguration configXml = new XMLConfiguration();
+ configXml.addProperty("virtualhosts.virtualhost(-1).name", getName());
+ configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
+
+ ServerConfiguration configuration = new ServerConfiguration(configXml);
+
+ ApplicationRegistry registry = new TestApplicationRegistry(configuration);
+ ApplicationRegistry.initialise(registry);
+ registry.getVirtualHostRegistry().setDefaultVirtualHostName(getName());
+
+ _virtualHost = registry.getVirtualHostRegistry().getVirtualHost(getName());
_queueRegistry = _virtualHost.getQueueRegistry();
- _defaultQueueCount = _queueRegistry.getQueues().size();
}
@Override
public void tearDown() throws Exception
{
- assertEquals("Queue was not registered in virtualhost", _defaultQueueCount + 1, _queueRegistry.getQueues().size());
- super.tearDown();
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
+ private void verifyRegisteredQueueCount(int count)
+ {
+ assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size());
}
+ private void verifyQueueRegistered(String queueName)
+ {
+ assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName));
+ }
+
public void testPriorityQueueRegistration() throws Exception
{
FieldTable fieldTable = new FieldTable();
@@ -63,13 +100,314 @@ public class AMQQueueFactoryTest extends InternalBrokerBaseCase
false, _virtualHost, fieldTable);
assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
+ verifyQueueRegistered("testPriorityQueue");
+ verifyRegisteredQueueCount(1);
}
public void testSimpleQueueRegistration() throws Exception
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
+ AMQShortString queueName = new AMQShortString("testSimpleQueueRegistration");
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false,
false, _virtualHost, null);
assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
+ verifyQueueRegistered("testSimpleQueueRegistration");
+
+ //verify that no alternate exchange or DLQ were produced
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+
+ assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange());
+ assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName));
+
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does
+ * cause the alternate exchange to be set and DLQ to be produced.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueEnabled() throws AMQException
+ {
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName));
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, fieldTable);
+
+ Exchange altExchange = queue.getAlternateExchange();
+ assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
+ assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
+ assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName());
+
+ assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName));
+
+ AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ assertNotNull("The DLQ was not registered as expected", dlQueue);
+ assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
+ assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
+ assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount());
+
+ //2 queues should have been registered
+ verifyRegisteredQueueCount(2);
+ }
+
+ /**
+ * Tests that the deadLetterQueues/maximumDeliveryCount settings from the configuration
+ * are not applied to the DLQ itself.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws AMQException
+ {
+ ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("deadLetterQueues","true");
+ ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("maximumDeliveryCount","5");
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName));
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, null);
+
+ assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount());
+ Exchange altExchange = queue.getAlternateExchange();
+ assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
+ assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
+ assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName());
+
+ assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName));
+
+ AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ assertNotNull("The DLQ was not registered as expected", dlQueue);
+ assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
+ assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
+ assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount());
+
+ //2 queues should have been registered
+ verifyRegisteredQueueCount(2);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not
+ * result in the alternate exchange being set and DLQ being created.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueDisabled() throws AMQException
+ {
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false);
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueDisabled");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName));
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, fieldTable);
+
+ assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
+ assertNull("The alternate exchange should still not exist", exReg.getExchange(dlExchangeName));
+
+ assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName));
+
+ //only 1 queue should have been registered
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but
+ * creating an auto-delete queue, does not result in the alternate exchange
+ * being set and DLQ being created.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException
+ {
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueNotCreatedForAutodeleteQueues");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName));
+
+ //create an autodelete queue
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), true, false,
+ _virtualHost, fieldTable);
+ assertTrue("Queue should be autodelete", queue.isAutoDelete());
+
+ //ensure that the autodelete property overrides the request to enable DLQ
+ assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
+ assertNull("The alternate exchange should not exist as queue is autodelete", exReg.getExchange(dlExchangeName));
+ assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName));
+
+ //only 1 queue should have been registered
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
+ * the desired effect.
+ */
+ public void testMaximumDeliveryCount() throws Exception
+ {
+ final FieldTable fieldTable = new FieldTable();
+ fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5);
+
+ final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount");
+
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, fieldTable);
+
+ assertNotNull("The queue was not registered as expected ", queue);
+ assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount());
+
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
+ * that queue is created with a default maximumDeliveryCount of zero (unless set in config).
+ */
+ public void testMaximumDeliveryCountDefault() throws Exception
+ {
+
+ final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount");
+
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, null);
+
+ assertNotNull("The queue was not registered as expected ", queue);
+ assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount());
+
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests queue creation with queue name set to null
+ */
+ public void testQueueNameNullValidation()
+ {
+ try
+ {
+ AMQQueueFactory.createAMQQueueImpl(null, false, new AMQShortString("owner"), true, false, _virtualHost, null);
+ fail("queue with null name can not be created!");
+ }
+ catch (Exception e)
+ {
+ assertTrue(e instanceof IllegalArgumentException);
+ assertEquals("Queue name must not be null", e.getMessage());
+ }
+ }
+
+ /**
+ * Tests queue creation with queue name length less 255 characters but
+ * corresponding DLQ name length greater than 255.
+ */
+ public void testQueueNameWithLengthLessThan255ButDLQNameWithLengthGreaterThan255()
+ {
+ String queueName = "test-" + generateStringWithLength('a', 245);
+ try
+ {
+ // change DLQ name to make its length bigger than exchange name
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", "_DLE");
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", "_DLQUEUE");
+
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"),
+ false, false, _virtualHost, fieldTable);
+ fail("queue with DLQ name having more than 255 characters can not be created!");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException);
+ assertTrue("Unexpected exception message!", e.getMessage().contains("DLQ queue name")
+ && e.getMessage().contains("length exceeds limit of 255"));
+ }
+ finally
+ {
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+ }
+
+ /**
+ * Tests queue creation with queue name length less 255 characters but
+ * corresponding DL exchange name length greater than 255.
+ */
+ public void testQueueNameWithLengthLessThan255ButDLExchangeNameWithLengthGreaterThan255()
+ {
+ String queueName = "test-" + generateStringWithLength('a', 245);
+ try
+ {
+ // change DLQ name to make its length bigger than exchange name
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", "_DLEXCHANGE");
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", "_DLQ");
+
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"),
+ false, false, _virtualHost, fieldTable);
+ fail("queue with DLE name having more than 255 characters can not be created!");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException);
+ assertTrue("Unexpected exception message!", e.getMessage().contains("DL exchange name")
+ && e.getMessage().contains("length exceeds limit of 255"));
+ }
+ finally
+ {
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+ }
+
+ private String generateStringWithLength(char ch, int length)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++)
+ {
+ sb.append(ch);
+ }
+ return sb.toString();
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 8f3023f269..f70250132a 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -44,6 +44,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularData;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@@ -390,6 +391,34 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
assertFalse(channel.getBlocking());
}
+ public void testMaximumDeliveryCount() throws IOException
+ {
+ assertEquals("Unexpected default maximum delivery count", Integer.valueOf(0), _queueMBean.getMaximumDeliveryCount());
+ }
+
+ public void testViewAllMessages() throws Exception
+ {
+ final int messageCount = 5;
+ sendPersistentMessages(messageCount);
+
+
+ final TabularData messageTable = _queueMBean.viewMessages(1L, 5L);
+ assertNotNull("Message table should not be null", messageTable);
+ assertEquals("Unexpected number of rows", messageCount, messageTable.size());
+
+
+ final Iterator rowIterator = messageTable.values().iterator();
+ // Get its message ID
+ final CompositeDataSupport row1 = (CompositeDataSupport) rowIterator.next();
+ final Long msgId = (Long) row1.get("AMQ MessageId");
+ final Long queuePosition = (Long) row1.get("Queue Position");
+ final Integer deliveryCount = (Integer) row1.get("Delivery Count");
+
+ assertNotNull("Row should have value for queue position", queuePosition);
+ assertNotNull("Row should have value for msgid", msgId);
+ assertNotNull("Row should have value for deliveryCount", deliveryCount);
+ }
+
@Override
public void setUp() throws Exception
@@ -404,6 +433,13 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
ApplicationRegistry.remove();
}
+ private void sendPersistentMessages(int messageCount) throws AMQException
+ {
+ sendMessages(messageCount, true);
+ assertEquals("Expected " + messageCount + " messages in the queue", messageCount, _queueMBean
+ .getMessageCount().intValue());
+ }
+
private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException
{
return sendMessages(messageCount, persistent, 0l, 0l);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 4c31092983..0daf79122c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -611,4 +611,21 @@ public class MockAMQQueue implements AMQQueue
{
}
+
+ @Override
+ public int getMaximumDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void setMaximumDeliveryCount(int maximumDeliveryCount)
+ {
+ }
+
+ @Override
+ public void setAlternateExchange(String exchangeName)
+ {
+ }
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 864b9ad368..7ad002c248 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -244,4 +244,21 @@ public class MockQueueEntry implements QueueEntry
return null;
}
+ @Override
+ public int getDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementDeliveryCount()
+ {
+ }
+
+ @Override
+ public void decrementDeliveryCount()
+ {
+ }
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index ad7885f195..6879fe0cfd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -284,7 +284,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
- _logger.debug("AMQP version " + amqpVersion);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("AMQP version " + amqpVersion);
+ }
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
@@ -1485,4 +1488,5 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _lastFailoverTime;
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index f9a38138ba..1df809c67c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -75,6 +75,8 @@ public abstract class AMQDestination implements Destination, Referenceable
private boolean _exchangeExistsChecked;
+ private RejectBehaviour _rejectBehaviour;
+
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
@@ -227,6 +229,8 @@ public abstract class AMQDestination implements Destination, Referenceable
_queueName = binding.getQueueName() == null ? null : binding.getQueueName();
_routingKey = binding.getRoutingKey() == null ? null : binding.getRoutingKey();
_bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
+ final String rejectBehaviourValue = binding.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR);
+ _rejectBehaviour = rejectBehaviourValue == null ? null : RejectBehaviour.valueOf(rejectBehaviourValue.toUpperCase());
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName)
@@ -294,7 +298,7 @@ public abstract class AMQDestination implements Destination, Referenceable
_bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
_destSyntax = DestSyntax.BURL;
_browseOnly = browseOnly;
-
+ _rejectBehaviour = null;
if (_logger.isDebugEnabled())
{
_logger.debug("Based on " + toString() + " the selected destination syntax is " + _destSyntax);
@@ -499,6 +503,13 @@ public abstract class AMQDestination implements Destination, Referenceable
sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
}
+ if (_rejectBehaviour != null)
+ {
+ sb.append(BindingURL.OPTION_REJECT_BEHAVIOUR);
+ sb.append("='" + _rejectBehaviour + "'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
//removeKey the last char '?' if there is no options , ',' if there are.
sb.deleteCharAt(sb.length() - 1);
url = sb.toString();
@@ -842,4 +853,19 @@ public abstract class AMQDestination implements Destination, Referenceable
{
return _addressResolved.get() > time;
}
+
+ /**
+ * This option is only applicable for 0-8/0-9/0-9-1 protocols connection
+ * <p>
+ * It tells the client to delegate the requeue/DLQ decision to the
+ * server .If this option is not specified, the messages won't be moved to
+ * the DLQ (or dropped) when delivery count exceeds the maximum.
+ *
+ * @return destination reject behaviour
+ */
+ public RejectBehaviour getRejectBehaviour()
+ {
+ return _rejectBehaviour;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ef44221ec1..8984b7ca8c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -310,7 +310,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Holds the highest received delivery tag. */
protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
-
+
+ /** Pre-fetched message tags */
+ protected ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();
+
/** All the not yet acknowledged message tags */
protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
@@ -2925,11 +2928,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_producers.put(new Long(producerId), producer);
}
- private void rejectAllMessages(boolean requeue)
- {
- rejectMessagesForConsumerTag(0, requeue, true);
- }
-
/**
* @param consumerTag The consumerTag to prune from queue or all if null
* @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
@@ -3235,7 +3233,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
for (C consumer : _consumers.values())
{
List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
- _unacknowledgedMessageTags.addAll(tags);
+ _prefetchedMessageTags.addAll(tags);
}
setConnectionStopped(isStopped);
@@ -3345,7 +3343,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else if (_usingDispatcherForCleanup)
{
- _unacknowledgedMessageTags.add(deliveryTag);
+ _prefetchedMessageTags.add(deliveryTag);
}
else
{
@@ -3548,4 +3546,5 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_logger.debug("Rollback mark is set to " + _rollbackMark.get());
}
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index e33410f5fe..96df463481 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
+import java.util.ArrayList;
import java.util.Map;
import javax.jms.Destination;
@@ -40,7 +41,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
@@ -62,7 +62,6 @@ import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.framing.QueueDeclareBody;
@@ -223,6 +222,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
public void sendRecover() throws AMQException, FailoverException
{
+ enforceRejectBehaviourDuringRecover();
+ _prefetchedMessageTags.clear();
_unacknowledgedMessageTags.clear();
if (isStrictAMQP())
@@ -259,6 +260,49 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
}
}
+ private void enforceRejectBehaviourDuringRecover()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + _unacknowledgedMessageTags);
+ }
+ ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(_consumers.values());
+ boolean messageListenerFound = false;
+ boolean serverRejectBehaviourFound = false;
+ for(BasicMessageConsumer_0_8 consumer : consumersToCheck)
+ {
+ if (consumer.isMessageListenerSet())
+ {
+ messageListenerFound = true;
+ }
+ if (RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
+ {
+ serverRejectBehaviourFound = true;
+ }
+ }
+ _logger.debug("about to pre-reject messages for " + consumersToCheck.size() + " consumer(s)");
+
+ if (serverRejectBehaviourFound)
+ {
+ //reject(false) any messages we don't want returned again
+ switch(_acknowledgeMode)
+ {
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ case Session.AUTO_ACKNOWLEDGE:
+ if (!messageListenerFound)
+ {
+ break;
+ }
+ case Session.CLIENT_ACKNOWLEDGE:
+ for(Long tag : _unacknowledgedMessageTags)
+ {
+ rejectMessage(tag, false);
+ }
+ break;
+ }
+ }
+ }
+
public void releaseForRollback()
{
// Reject all the messages that have been received in this session and
@@ -267,6 +311,17 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
// Otherwise messages will be able to arrive out of order to a second
// consumer on the queue. Whilst this is within the JMS spec it is not
// user friendly and avoidable.
+ boolean normalRejectBehaviour = true;
+ for (BasicMessageConsumer_0_8 consumer : _consumers.values())
+ {
+ if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
+ {
+ normalRejectBehaviour = false;
+ //no need to consult other consumers now, found server behaviour.
+ break;
+ }
+ }
+
while (true)
{
Long tag = _deliveredMessageTags.poll();
@@ -275,13 +330,14 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
break;
}
- rejectMessage(tag, true);
+ rejectMessage(tag, normalRejectBehaviour);
}
}
public void rejectMessage(long deliveryTag, boolean requeue)
{
- if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
+ if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)||
+ ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
{
if (_logger.isDebugEnabled())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 7bb400fada..c6e5fbb019 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -147,7 +147,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
private List<StackTraceElement> _closedStack = null;
-
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
@@ -211,6 +210,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
_arguments = ft;
+
}
public AMQDestination getDestination()
@@ -814,31 +814,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
-
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- *
- * @return the lastDeliveryTag to acknowledge
- */
- Long getLastDelivered()
- {
- if (!_receivedDeliveryTags.isEmpty())
- {
- Long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- return lastDeliveryTag;
- }
-
- return null;
- }
-
void notifyError(Throwable cause)
{
// synchronized (_closed)
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index cf1d7cedeb..efcbfd5532 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -28,7 +28,10 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.BindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +39,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final RejectBehaviour _rejectBehaviour;
+
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
@@ -55,6 +60,25 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
+ if (destination.getRejectBehaviour() != null)
+ {
+ _rejectBehaviour = destination.getRejectBehaviour();
+ }
+ else
+ {
+ ConnectionURL connectionURL = connection.getConnectionURL();
+ String rejectBehaviour = connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR);
+ if (rejectBehaviour != null)
+ {
+ _rejectBehaviour = RejectBehaviour.valueOf(rejectBehaviour.toUpperCase());
+ }
+ else
+ {
+ // use the default value for all connections, if not set
+ rejectBehaviour = System.getProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.NORMAL.toString());
+ _rejectBehaviour = RejectBehaviour.valueOf( rejectBehaviour.toUpperCase());
+ }
+ }
}
void sendCancel() throws AMQException, FailoverException
@@ -89,4 +113,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
}
+
+ public RejectBehaviour getRejectBehaviour()
+ {
+ return _rejectBehaviour;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java b/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java
new file mode 100644
index 0000000000..e3c958044e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+/**
+ * This enum can be used only with for 0-8/0-9/0-9-1 protocols connections to notify
+ * the client to delegate the requeue/DLQ decision to the server
+ * if <code>SERVER</server> value is specified. Otherwise the messages won't be moved to
+ * the DLQ (or dropped) when delivery count exceeds the maximum.
+ */
+public enum RejectBehaviour
+{
+ NORMAL, SERVER;
+}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index 26641982d7..24d9360cfa 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -41,7 +41,16 @@ public interface ConnectionURL
public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format";
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
- public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+ public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+
+ /**
+ * This option is only applicable for 0-8/0-9/0-9-1 protocols connection
+ * <p>
+ * It tells the client to delegate the requeue/DLQ decision to the
+ * server .If this option is not specified, the messages won't be moved to
+ * the DLQ (or dropped) when delivery count exceeds the maximum.
+ */
+ public static final String OPTIONS_REJECT_BEHAVIOUR = "rejectbehaviour";
public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
new file mode 100644
index 0000000000..3a565f0f0d
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQInvalidArgumentException;
+
+public class AMQConnectionUnitTest extends TestCase
+{
+
+ public void testExceptionReceived()
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+ AMQInvalidArgumentException expectedException = new AMQInvalidArgumentException("Test", null);
+ final AtomicReference<JMSException> receivedException = new AtomicReference<JMSException>();
+ try
+ {
+ MockAMQConnection connection = new MockAMQConnection(url);
+ connection.setExceptionListener(new ExceptionListener()
+ {
+
+ @Override
+ public void onException(JMSException jmsException)
+ {
+ receivedException.set(jmsException);
+ }
+ });
+ connection.exceptionReceived(expectedException);
+ }
+ catch (Exception e)
+ {
+ fail("Failure to test exceptionRecived:" + e.getMessage());
+ }
+ JMSException exception = receivedException.get();
+ assertNotNull("Expected JMSException but got null", exception);
+ assertEquals("JMSException error code is incorrect", Integer.toString(expectedException.getErrorCode().getCode()), exception.getErrorCode());
+ assertNotNull("Expected not null message for JMSException", exception.getMessage());
+ assertTrue("JMSException error message is incorrect", exception.getMessage().contains(expectedException.getMessage()));
+ assertEquals("JMSException linked exception is incorrect", expectedException, exception.getLinkedException());
+ }
+
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java b/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
new file mode 100644
index 0000000000..d8d94ba40e
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.Session;
+
+import org.apache.qpid.test.unit.message.TestAMQSession;
+import org.apache.qpid.url.AMQBindingURL;
+
+import junit.framework.TestCase;
+
+public class BasicMessageConsumer_0_8_Test extends TestCase
+{
+ /**
+ * Test that if there is a value for Reject Behaviour specified for the Destination
+ * used to create the Consumer, it overrides the value for the Connection.
+ */
+ public void testDestinationRejectBehaviourOverridesDefaultConnection() throws Exception
+ {
+ /*
+ * Check that when the connection does not have a value applied that this
+ * is successfully overridden with a specific value by the consumer.
+ */
+ String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+ AMQConnection conn = new MockAMQConnection(connUrlString);
+
+ String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'";
+ AMQBindingURL burl = new AMQBindingURL(url);
+ AMQDestination queue = new AMQQueue(burl);
+
+ AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
+ BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+
+ assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour());
+ }
+
+ /**
+ * Check that when the connection does have a specific value applied that this
+ * is successfully overridden with another specific value by the consumer.
+ */
+ public void testDestinationRejectBehaviourSpecified() throws Exception
+ {
+ final String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='server'";
+ final AMQConnection conn = new MockAMQConnection(connUrlString);
+
+ final String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='normal'";
+ final AMQBindingURL burl = new AMQBindingURL(url);
+ final AMQDestination queue = new AMQQueue(burl);
+
+ final AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
+ final BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+
+ assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
+ }
+
+ /**
+ * Test that if no value for Reject Behaviour is applied to the Destination, then the value
+ * from the connection is used and acts as expected.
+ */
+ public void testRejectBehaviourDetectedFromConnection() throws Exception
+ {
+ /*
+ * Check that when the connection does have a specific value applied that this
+ * is successfully detected by the consumer.
+ */
+ String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='normal'";
+ AMQConnection conn = new MockAMQConnection(connUrlString);
+
+ String url = "exchangeClass://exchangeName/Destination/Queue";
+ AMQBindingURL burl = new AMQBindingURL(url);
+ AMQDestination queue = new AMQQueue(burl);
+
+ assertNull("Reject behaviour should have been null", queue.getRejectBehaviour());
+
+ AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
+ BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+
+ assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
+ }
+
+
+ protected RejectBehaviour getRejectBehaviour(AMQDestination destination)
+ {
+ return destination.getRejectBehaviour();
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
index 73e67469ae..919809edc3 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
@@ -55,4 +55,9 @@ public class MockAMQConnection extends AMQConnection
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN);
return null;
}
+
+ public AMQConnectionDelegate getDelegate()
+ {
+ return _delegate;
+ }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
index 4624b36fea..5a5a3a0bd9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ public class ConnectionURLTest extends TestCase
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
- assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE));
+ assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE));
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals("bob"));
assertTrue(connectionurl.getVirtualHost().equals("/test"));
@@ -338,7 +338,7 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getPassword().equals("pass"));
assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getClientName().equals("client_id"));
-
+
assertTrue(connectionurl.getBrokerCount() == 1);
}
@@ -457,7 +457,6 @@ public class ConnectionURLTest extends TestCase
assertTrue(service.getTransport().equals("tcp"));
-
assertTrue(service.getHost().equals("localhost"));
assertTrue(service.getPort() == 5672);
assertEquals("jim",service.getProperty("foo"));
@@ -468,7 +467,7 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getOption("timeout").equals("200"));
assertTrue(connectionurl.getOption("immediatedelivery").equals("true"));
}
-
+
/**
* Test that options other than failover and brokerlist are returned in the string representation.
* <p>
@@ -477,7 +476,7 @@ public class ConnectionURLTest extends TestCase
public void testOptionToString() throws Exception
{
ConnectionURL url = new AMQConnectionURL("amqp://user:pass@temp/test?maxprefetch='12345'&brokerlist='tcp://localhost:5672'");
-
+
assertTrue("String representation should contain options and values", url.toString().contains("maxprefetch='12345'"));
}
@@ -493,10 +492,10 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getBrokerCount() == 1);
BrokerDetails service = connectionurl.getBrokerDetails(0);
- assertTrue(service.getTransport().equals("tcp"));
+ assertTrue(service.getTransport().equals("tcp"));
assertTrue(service.getHost().equals("under_score"));
assertTrue(service.getPort() == 6672);
-
+
url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score'";
connectionurl = new AMQConnectionURL(url);
@@ -507,11 +506,44 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getBrokerCount() == 1);
service = connectionurl.getBrokerDetails(0);
- assertTrue(service.getTransport().equals("tcp"));
+ assertTrue(service.getTransport().equals("tcp"));
assertTrue(service.getHost().equals("under_score"));
assertTrue(service.getPort() == 5672);
}
-
+
+
+ public void testRejectBehaviourPresent() throws Exception
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='server'";
+
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ assertTrue(connectionURL.getFailoverMethod() == null);
+ assertTrue(connectionURL.getUsername().equals("guest"));
+ assertTrue(connectionURL.getPassword().equals("guest"));
+ assertTrue(connectionURL.getVirtualHost().equals("/test"));
+
+ //check that the reject behaviour option is returned as expected
+ assertEquals("Reject behaviour option was not as expected", "server",
+ connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR));
+ }
+
+ public void testRejectBehaviourNotPresent() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'";
+
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getFailoverMethod() == null);
+ assertTrue(connectionurl.getUsername().equals("guest"));
+ assertTrue(connectionurl.getPassword().equals("guest"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+ //check that the reject behaviour option is null as expected
+ assertNull("Reject behaviour option was not as expected",
+ connectionurl.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR));
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ConnectionURLTest.class);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
index 7de09cff45..2c32e4c559 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
@@ -22,8 +22,11 @@ package org.apache.qpid.test.unit.client.destinationurl;
import junit.framework.TestCase;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.RejectBehaviour;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -190,6 +193,67 @@ public class DestinationURLTest extends TestCase
assertTrue(dest.getQueueName().equals("test:testQueueD"));
}
+ public void testRejectBehaviourPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+ assertTrue(burl.getExchangeClass().equals("exchangeClass"));
+ assertTrue(burl.getExchangeName().equals("exchangeName"));
+ assertTrue(burl.getDestinationName().equals("Destination"));
+ assertTrue(burl.getQueueName().equals("Queue"));
+
+ //check that the MaxDeliveryCount property has the right value
+ assertEquals("server",burl.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR));
+
+ //check that the MaxDeliveryCount value is correctly returned from an AMQDestination
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertEquals("Reject behaviour is unexpected", RejectBehaviour.SERVER, dest.getRejectBehaviour());
+ }
+
+ public void testRejectBehaviourNotPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertTrue(burl.getExchangeClass().equals("exchangeClass"));
+ assertTrue(burl.getExchangeName().equals("exchangeName"));
+ assertTrue(burl.getDestinationName().equals("Destination"));
+ assertTrue(burl.getQueueName().equals("Queue"));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertNull("Reject behaviour is unexpected", dest.getRejectBehaviour());
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DestinationURLTest.class);
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index 88e2fb0176..a36e7c214e 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -108,6 +108,14 @@ public class ClientProperties
public static final String QPID_TCP_NODELAY_PROP_NAME = "qpid.tcp_nodelay";
public static final String AMQJ_TCP_NODELAY_PROP_NAME = "amqj.tcp_nodelay";
+ /**
+ * System property to set the reject behaviour. default value will be 'normal' but can be
+ * changed to 'server' in which case the server decides whether a message should be requeued
+ * or dead lettered.
+ * This can be overridden by the more specific settings at connection or binding URL level.
+ */
+ public static final String REJECT_BEHAVIOUR_PROP_NAME = "qpid.reject.behaviour";
+
/*
public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
index 9996fff311..0e6c865a16 100644
--- a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
+++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
@@ -37,6 +37,15 @@ public interface BindingURL
public static final String OPTION_ROUTING_KEY = "routingkey";
public static final String OPTION_BINDING_KEY = "bindingkey";
+ /**
+ * This option is only applicable for 0-8/0-9/0-9-1 protocols connection
+ * <p>
+ * It tells the client to delegate the requeue/DLQ decision to the
+ * server .If this option is not specified, the messages won't be moved to
+ * the DLQ (or dropped) when delivery count exceeds the maximum.
+ */
+ public static final String OPTION_REJECT_BEHAVIOUR = "rejectbehaviour";
+
String getURL();
diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
index b5c80a4fed..b74342df1f 100644
--- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
+++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
@@ -23,6 +23,7 @@ package org.apache.qpid.management.common.mbeans;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -118,6 +119,24 @@ public interface ManagedBroker
throws IOException, JMException, MBeanException;
/**
+ * Create a new Queue in the VirtualHost
+ *
+ * @since Qpid JMX API 2.4
+ * @param queueName name of the new queue
+ * @param durable true if the queue should be durable
+ * @param owner owner
+ * @param arguments declaration arguments for use when creating the queue, may be null.
+ * @throws IOException
+ * @throws JMException
+ */
+ @MBeanOperation(name="createNewQueue", description="Create a new Queue in the VirtualHost", impact= MBeanOperationInfo.ACTION)
+ void createNewQueue(@MBeanOperationParameter(name="queue name", description="Name of the new queue")String queueName,
+ @MBeanOperationParameter(name="owner", description="Owner name")String owner,
+ @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable,
+ @MBeanOperationParameter(name="arguments", description="Map of arguments")Map<String,Object> arguments)
+ throws IOException, JMException;
+
+ /**
* Unregisters the Queue bindings, removes the subscriptions and unregisters
* from the managed objects.
*
diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
index be31d8ef88..c23a0f5076 100644
--- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
+++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
@@ -50,7 +50,8 @@ public interface ManagedQueue
String MSG_SIZE = "Size(bytes)";
String MSG_REDELIVERED = "Redelivered";
String MSG_QUEUE_POS = "Queue Position";
- List<String> VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC = Collections.unmodifiableList(Arrays.asList(MSG_AMQ_ID, MSG_HEADER, MSG_SIZE, MSG_REDELIVERED, MSG_QUEUE_POS));
+ String MSG_DELIVERY_COUNT = "Delivery Count";
+ List<String> VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC = Collections.unmodifiableList(Arrays.asList(MSG_AMQ_ID, MSG_HEADER, MSG_SIZE, MSG_REDELIVERED, MSG_QUEUE_POS, MSG_DELIVERY_COUNT));
List<String> VIEW_MSGS_TABULAR_UNIQUE_INDEX = Collections.unmodifiableList(Arrays.asList(MSG_QUEUE_POS));
//CompositeType key/description information for message content
@@ -67,6 +68,7 @@ public interface ManagedQueue
static final String ATTR_MAX_MSG_COUNT = "MaximumMessageCount";
static final String ATTR_MAX_QUEUE_DEPTH = "MaximumQueueDepth";
static final String ATTR_MAX_MSG_SIZE = "MaximumMessageSize";
+ static final String ATTR_MAXIMUM_DELIVERY_COUNT = "MaximumDeliveryCount";
static final String ATTR_DURABLE = "Durable";
static final String ATTR_AUTODELETE = "AutoDelete";
static final String ATTR_CONSUMER_COUNT = "ConsumerCount";
@@ -78,7 +80,8 @@ public interface ManagedQueue
static final String ATTR_FLOW_OVERFULL = "FlowOverfull";
static final String ATTR_FLOW_RESUME_CAPACITY = "FlowResumeCapacity";
static final String ATTR_EXCLUSIVE = "Exclusive";
-
+ static final String ATTR_ALT_EXCHANGE = "AlternateExchange";
+
//All attribute names constant
static final List<String> QUEUE_ATTRIBUTES
= Collections.unmodifiableList(
@@ -91,6 +94,7 @@ public interface ManagedQueue
ATTR_MAX_MSG_COUNT,
ATTR_MAX_QUEUE_DEPTH,
ATTR_MAX_MSG_SIZE,
+ ATTR_MAXIMUM_DELIVERY_COUNT,
ATTR_DURABLE,
ATTR_AUTODELETE,
ATTR_CONSUMER_COUNT,
@@ -101,7 +105,9 @@ public interface ManagedQueue
ATTR_CAPACITY,
ATTR_FLOW_OVERFULL,
ATTR_FLOW_RESUME_CAPACITY,
- ATTR_EXCLUSIVE))));
+ ATTR_EXCLUSIVE,
+ ATTR_ALT_EXCHANGE
+ ))));
/**
* Returns the Name of the ManagedQueue.
@@ -120,6 +126,16 @@ public interface ManagedQueue
Integer getMessageCount() throws IOException;
/**
+ * Maximum number of times a message is permitted to be delivered or zero if not enforced.
+ *
+ * @since Qpid JMX API 2.4
+ * @return maximum delivery count
+ * @throws IOException
+ */
+ @MBeanAttribute(name="MaximumDeliveryCount", description = "Maximum number of times a message is permitted to be delivered or zero if not enforced")
+ Integer getMaximumDeliveryCount() throws IOException;
+
+ /**
* Tells the total number of messages receieved by the queue since startup.
* @return total number of messages received.
* @throws IOException
@@ -309,7 +325,7 @@ public interface ManagedQueue
/**
* Sets whether the queue is exclusive or not.
- *
+ *
* @since Qpid JMX API 2.0
* @param exclusive the capacity in bytes
* @throws IOException
@@ -318,6 +334,25 @@ public interface ManagedQueue
@MBeanAttribute(name="Exclusive", description="Whether the queue is Exclusive or not")
void setExclusive(boolean exclusive) throws IOException, JMException;
+ /**
+ * Sets the Alternate Exchange for the queue, for use in dead letter queue functionality.
+ *
+ * @since Qpid JMX API 2.4
+ * @param the name of the exchange to use. Specifying null or the empty string will clear the alternate exchange.
+ * @throws IOException
+ */
+ void setAlternateExchange(String exchangeName) throws IOException;
+
+ /**
+ * Returns the name of the Alternate Exchange for the queue, or null if there isn't one.
+ *
+ * @since Qpid JMX API 2.4
+ * @return the name of the Alternate Exchange for the queue, or null if there isn't one
+ * @throws IOException
+ */
+ @MBeanAttribute(name="AlternateExchange", description="Alternate exchange for the queue")
+ String getAlternateExchange() throws IOException;
+
//********** Operations *****************//
diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
index 12ae69571e..9d40edd8d0 100644
--- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
+++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
@@ -47,7 +47,7 @@ public interface ServerInformation
* Qpid JMX API 1.1 can be assumed.
*/
int QPID_JMX_API_MAJOR_VERSION = 2;
- int QPID_JMX_API_MINOR_VERSION = 3;
+ int QPID_JMX_API_MINOR_VERSION = 4;
/**
diff --git a/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java b/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java
index 1a4a73f207..c931b921df 100644
--- a/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java
+++ b/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java
@@ -23,7 +23,6 @@ package org.apache.qpid.management.common.mbeans;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import java.util.ArrayList;
import java.util.List;
import javax.management.MBeanAttributeInfo;
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
index 4a59176374..faa6769c63 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
@@ -47,7 +47,7 @@ public abstract class ApplicationRegistry
//max supported broker management interface supported by this release of the management console
public static final int SUPPORTED_QPID_JMX_API_MAJOR_VERSION = 2;
- public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 3;
+ public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 4;
public static final String DATA_DIR = System.getProperty("user.home") + File.separator + ".qpidmc";
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java
index 390a7b55e4..97ba9afc32 100644
--- a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.qpid.management.jmx;
+import java.util.Collections;
+import java.util.Map;
+
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.ObjectName;
@@ -25,6 +28,8 @@ import javax.management.ObjectName;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -136,4 +141,26 @@ public class ManagedBrokerMBeanTest extends QpidBrokerTestCase
final ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(defaultExchangeName);
assertNotNull("Exchange should exist", defaultExchange);
}
+
+ /**
+ * Tests queue creation with {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests
+ * that the attribute is exposed correctly through {@link ManagedQueue#getMaximumDeliveryCount()}.
+ */
+ public void testCreateQueueWithMaximumDeliveryCountSet() throws Exception
+ {
+ final String queueName = getName();
+ final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST);
+
+ final Integer deliveryCount = 1;
+ final Map<String, Object> args = Collections.singletonMap(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount);
+ managedBroker.createNewQueue(queueName, "testowner", true, args);
+
+ // Ensure the queue exists
+ assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName("test", queueName));
+ assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName));
+
+ final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+ assertEquals("Unexpected maximum delivery count", deliveryCount, managedQueue.getMaximumDeliveryCount());
+ }
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
new file mode 100644
index 0000000000..e59dac8c01
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -0,0 +1,695 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Test that the MaxRedelivery feature works as expected, allowing the client to reject
+ * messages during rollback/recover whilst specifying they not be requeued if delivery
+ * to an application has been attempted a specified number of times.
+ *
+ * General approach: specify a set of messages which will cause the test client to then
+ * deliberately rollback/recover the session after consuming, and monitor that they are
+ * re-delivered the specified number of times before the client rejects them without requeue
+ * and then verify that they are not subsequently redelivered.
+ *
+ * Additionally, the queue used in the test is configured for DLQ'ing, and the test verifies
+ * that the messages rejected without requeue are then present on the appropriate DLQ.
+ */
+public class MaxDeliveryCountTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = Logger.getLogger(MaxDeliveryCountTest.class);
+ private boolean _failed;
+ private String _failMsg;
+ private static final int MSG_COUNT = 15;
+ private static final int MAX_DELIVERY_COUNT = 2;
+ private CountDownLatch _awaitCompletion;
+
+ public void setUp() throws Exception
+ {
+ //enable DLQ/maximumDeliveryCount support for all queues at the vhost level
+ setConfigurationProperty("virtualhosts.virtualhost.test.queues.maximumDeliveryCount",
+ String.valueOf(MAX_DELIVERY_COUNT));
+ setConfigurationProperty("virtualhosts.virtualhost.test.queues.deadLetterQueues",
+ String.valueOf(true));
+
+ //Ensure management is on
+ setConfigurationProperty("management.enabled", "true");
+ setConfigurationProperty("management.ssl.enabled", "false");
+
+ // Set client-side flag to allow the server to determine if messages
+ // dead-lettered or requeued.
+ setTestClientSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, "server");
+
+ super.setUp();
+
+ boolean durableSub = isDurSubTest();
+
+ //declare the test queue
+ Connection consumerConnection = getConnection();
+ Session consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Destination destination = getDestination(consumerSession, durableSub);
+ if(durableSub)
+ {
+ consumerSession.createDurableSubscriber((Topic)destination, getName()).close();
+ }
+ else
+ {
+ consumerSession.createConsumer(destination).close();
+ }
+
+ consumerConnection.close();
+
+ //Create Producer put some messages on the queue
+ Connection producerConnection = getConnection();
+ producerConnection.start();
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(getDestination(producerSession, durableSub));
+
+ for (int count = 1; count <= MSG_COUNT; count++)
+ {
+ Message msg = producerSession.createTextMessage(generateContent(count));
+ msg.setIntProperty("count", count);
+ producer.send(msg);
+ }
+
+ producerConnection.close();
+
+ _failed = false;
+ _awaitCompletion = new CountDownLatch(1);
+ }
+
+ private Destination getDestination(Session consumerSession, boolean durableSub) throws JMSException
+ {
+ if(durableSub)
+ {
+ return consumerSession.createTopic(getTestQueueName());
+ }
+ else
+ {
+ return consumerSession.createQueue(getTestQueueName());
+ }
+ }
+
+ private String generateContent(int count)
+ {
+ return "Message " + count + " content.";
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using onMessage() on a
+ * Client-Ack session.
+ */
+ public void testAsynchronousClientAckSession() throws Exception
+ {
+ final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
+ redeliverMsgs.add(1);
+ redeliverMsgs.add(2);
+ redeliverMsgs.add(5);
+ redeliverMsgs.add(14);
+
+ doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, false, false);
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using onMessage() on a
+ * transacted session.
+ */
+ public void testAsynchronousTransactedSession() throws Exception
+ {
+ final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
+ redeliverMsgs.add(1);
+ redeliverMsgs.add(2);
+ redeliverMsgs.add(5);
+ redeliverMsgs.add(14);
+
+ doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, false);
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using onMessage() on an
+ * Auto-Ack session.
+ */
+ public void testAsynchronousAutoAckSession() throws Exception
+ {
+ final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
+ redeliverMsgs.add(1);
+ redeliverMsgs.add(2);
+ redeliverMsgs.add(5);
+ redeliverMsgs.add(14);
+
+ doTest(Session.AUTO_ACKNOWLEDGE, redeliverMsgs, false, false);
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using onMessage() on a
+ * Dups-OK session.
+ */
+ public void testAsynchronousDupsOkSession() throws Exception
+ {
+ final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
+ redeliverMsgs.add(1);
+ redeliverMsgs.add(2);
+ redeliverMsgs.add(5);
+ redeliverMsgs.add(14);
+
+ doTest(Session.DUPS_OK_ACKNOWLEDGE, redeliverMsgs, false, false);
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using recieve() on a
+ * Client-Ack session.
+ */
+ public void testSynchronousClientAckSession() throws Exception
+ {
+ final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
+ redeliverMsgs.add(1);
+ redeliverMsgs.add(2);
+ redeliverMsgs.add(3);
+ redeliverMsgs.add(14);
+
+ doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, true, false);
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using recieve() on a
+ * transacted session.
+ */
+ public void testSynchronousTransactedSession() throws Exception
+ {
+ final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
+ redeliverMsgs.add(1);
+ redeliverMsgs.add(2);
+ redeliverMsgs.add(5);
+ redeliverMsgs.add(14);
+
+ doTest(Session.SESSION_TRANSACTED, redeliverMsgs, true, false);
+ }
+
+ public void testDurableSubscription() throws Exception
+ {
+ final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
+ redeliverMsgs.add(1);
+ redeliverMsgs.add(2);
+ redeliverMsgs.add(5);
+ redeliverMsgs.add(14);
+
+ doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, true);
+ }
+
+ public void doTest(final int deliveryMode, final ArrayList<Integer> redeliverMsgs, final boolean synchronous, final boolean durableSub) throws Exception
+ {
+ final Connection clientConnection = getConnection();
+
+ final boolean transacted = deliveryMode == Session.SESSION_TRANSACTED ? true : false;
+ final Session clientSession = clientConnection.createSession(transacted, deliveryMode);
+
+ MessageConsumer consumer;
+ Destination dest = getDestination(clientSession, durableSub);
+ AMQQueue checkQueue;
+ if(durableSub)
+ {
+ consumer = clientSession.createDurableSubscriber((Topic)dest, getName());
+ checkQueue = new AMQQueue("amq.topic", "clientid" + ":" + getName());
+ }
+ else
+ {
+ consumer = clientSession.createConsumer(dest);
+ checkQueue = (AMQQueue) dest;
+ }
+
+ assertEquals("The queue should have " + MSG_COUNT + " msgs at start",
+ MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue));
+
+ clientConnection.start();
+
+ int expectedDeliveries = MSG_COUNT + ((MAX_DELIVERY_COUNT -1) * redeliverMsgs.size());
+
+ if(synchronous)
+ {
+ doSynchronousTest(clientSession, consumer, clientSession.getAcknowledgeMode(),
+ MAX_DELIVERY_COUNT, expectedDeliveries, redeliverMsgs);
+ }
+ else
+ {
+ addMessageListener(clientSession, consumer, clientSession.getAcknowledgeMode(),
+ MAX_DELIVERY_COUNT, expectedDeliveries, redeliverMsgs);
+
+ try
+ {
+ if (!_awaitCompletion.await(20, TimeUnit.SECONDS))
+ {
+ fail("Test did not complete in 20 seconds.");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ fail("Unable to wait for test completion");
+ throw e;
+ }
+
+ if(_failed)
+ {
+ fail(_failMsg);
+ }
+ }
+ consumer.close();
+
+ //check the source queue is now empty
+ assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue));
+
+ //check the DLQ has the required number of rejected-without-requeue messages
+ verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub);
+
+ if(isBrokerStorePersistent())
+ {
+ //restart the broker to verify persistence of the DLQ and the messages on it
+ clientConnection.close();
+
+ restartBroker();
+
+ final Connection clientConnection2 = getConnection();
+ final Session clientSession2 = clientConnection2.createSession(transacted, deliveryMode);
+ clientConnection2.start();
+
+ //verify the messages on the DLQ
+ verifyDLQcontent(clientConnection2, redeliverMsgs, getTestQueueName(), durableSub);
+ clientConnection2.close();
+ }
+ else
+ {
+
+ //verify the messages on the DLQ
+ verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub);
+ clientConnection.close();
+ }
+
+ }
+
+ private void verifyDLQdepth(int expected, Session clientSession, boolean durableSub) throws AMQException
+ {
+ AMQDestination checkQueueDLQ;
+ if(durableSub)
+ {
+ checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+ else
+ {
+ checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+
+ assertEquals("The DLQ should have " + expected + " msgs on it", expected,
+ ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueueDLQ));
+ }
+
+ private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException
+ {
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer;
+ if(durableSub)
+ {
+ if (isBroker010())
+ {
+ consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX));
+ }
+ else
+ {
+ consumer = clientSession.createDurableSubscriber(clientSession.createTopic(destName), getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+ }
+ else
+ {
+ consumer = clientSession.createConsumer(
+ clientSession.createQueue(destName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX));
+ }
+
+ //keep track of the message we expect to still be on the DLQ
+ List<Integer> outstandingMessages = new ArrayList<Integer>(redeliverMsgs);
+ int numMsg = outstandingMessages.size();
+
+ for(int i = 0; i < numMsg; i++)
+ {
+ Message message = consumer.receive(250);
+
+ assertNotNull("failed to consume expected message " + i + " from DLQ", message);
+ assertTrue("message " + i + " was the wrong type", message instanceof TextMessage);
+
+ //using Integer here to allow removing the value from the list, using int
+ //would instead result in removal of the element at that index
+ Integer msgId = message.getIntProperty("count");
+
+ TextMessage txt = (TextMessage) message;
+ _logger.info("Received message " + msgId + " at " + i + " from the DLQ: " + txt.getText());
+
+ assertTrue("message " + i + " was not one of those which should have been on the DLQ",
+ redeliverMsgs.contains(msgId));
+ assertTrue("message " + i + " was not one of those expected to still be on the DLQ",
+ outstandingMessages.contains(msgId));
+ assertEquals("Message " + i + " content was not as expected", generateContent(msgId), txt.getText());
+
+ //remove from the list of outstanding msgs
+ outstandingMessages.remove(msgId);
+ }
+
+ if(outstandingMessages.size() > 0)
+ {
+ String failures = "";
+ for(Integer msg : outstandingMessages)
+ {
+ failures = failures.concat(msg + " ");
+ }
+ fail("some DLQ'd messages were not found on the DLQ: " + failures);
+ }
+ }
+
+ private void addMessageListener(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount,
+ final int expectedTotalNumberOfDeliveries, final ArrayList<Integer> redeliverMsgs) throws JMSException
+ {
+ if(deliveryMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE
+ || deliveryMode == org.apache.qpid.jms.Session.PRE_ACKNOWLEDGE)
+ {
+ failAsyncTest("Max Delivery feature is not supported with this acknowledgement mode" +
+ "when using asynchronous message delivery.");
+ }
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int _deliveryAttempts = 0; //number of times given message(s) have been seen
+ private int _numMsgsToBeRedelivered = 0; //number of messages to rollback/recover
+ private int _totalNumDeliveries = 0;
+ private int _expectedMessage = 1;
+
+ public void onMessage(Message message)
+ {
+ if(_failed || _awaitCompletion.getCount() == 0L)
+ {
+ //don't process anything else
+ return;
+ }
+
+ _totalNumDeliveries++;
+
+ if (message == null)
+ {
+ failAsyncTest("Should not get null messages");
+ return;
+ }
+
+ try
+ {
+ int msgId = message.getIntProperty("count");
+
+ _logger.info("Received message: " + msgId);
+
+ //check the message is the one we expected
+ if(_expectedMessage != msgId)
+ {
+ failAsyncTest("Expected message " + _expectedMessage + " , got message " + msgId);
+ return;
+ }
+
+ _expectedMessage++;
+
+ //keep track of the overall deliveries to ensure we don't see more than expected
+ if(_totalNumDeliveries > expectedTotalNumberOfDeliveries)
+ {
+ failAsyncTest("Expected total of " + expectedTotalNumberOfDeliveries +
+ " message deliveries, reached " + _totalNumDeliveries);
+ }
+
+ //check if this message is one chosen to be rolled back / recovered
+ if(redeliverMsgs.contains(msgId))
+ {
+ _numMsgsToBeRedelivered++;
+
+ //check if next message is going to be rolled back / recovered too
+ if(redeliverMsgs.contains(msgId +1))
+ {
+ switch(deliveryMode)
+ {
+ case Session.SESSION_TRANSACTED:
+ //skip on to next message immediately
+ return;
+ case Session.CLIENT_ACKNOWLEDGE:
+ //skip on to next message immediately
+ return;
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ //fall through
+ case Session.AUTO_ACKNOWLEDGE:
+ //must recover session now or onMessage will ack, so
+ //just fall through the if
+ break;
+ }
+ }
+
+ _deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen
+
+ _logger.debug("ROLLBACK/RECOVER");
+ switch(deliveryMode)
+ {
+ case Session.SESSION_TRANSACTED:
+ session.rollback();
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ //fall through
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ //fall through
+ case Session.AUTO_ACKNOWLEDGE:
+ session.recover();
+ break;
+ }
+
+ if( _deliveryAttempts >= maxDeliveryCount)
+ {
+ //the client should have rejected the latest messages upon then
+ //above recover/rollback, adjust counts to compensate
+ _deliveryAttempts = 0;
+ }
+ else
+ {
+ //the message(s) should be redelivered, adjust expected message
+ _expectedMessage -= _numMsgsToBeRedelivered;
+ }
+ _logger.debug("XXX _expectedMessage: " + _expectedMessage + " _deliveryAttempts : " + _deliveryAttempts + " _numMsgsToBeRedelivered=" + _numMsgsToBeRedelivered);
+ //reset count of messages expected to be redelivered
+ _numMsgsToBeRedelivered = 0;
+ }
+ else
+ {
+ //consume the message
+ switch(deliveryMode)
+ {
+ case Session.SESSION_TRANSACTED:
+ session.commit();
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ message.acknowledge();
+ break;
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ //fall-through
+ case Session.AUTO_ACKNOWLEDGE:
+ //do nothing, onMessage will ack on exit.
+ break;
+ }
+ }
+
+ if (msgId == MSG_COUNT)
+ {
+ //if this is the last message let the test complete.
+ if (expectedTotalNumberOfDeliveries == _totalNumDeliveries)
+ {
+ _awaitCompletion.countDown();
+ }
+ else
+ {
+ failAsyncTest("Last message received, but we have not had the " +
+ "expected number of total delivieres. Received " + _totalNumDeliveries + " Expecting : " + expectedTotalNumberOfDeliveries);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ failAsyncTest(e.getMessage());
+ }
+ }
+ });
+ }
+
+ private void failAsyncTest(String msg)
+ {
+ _logger.error("Failing test because: " + msg);
+ _failMsg = msg;
+ _failed = true;
+ _awaitCompletion.countDown();
+ }
+
+ private void doSynchronousTest(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount,
+ final int expectedTotalNumberOfDeliveries, final ArrayList<Integer> redeliverMsgs) throws JMSException, AMQException, InterruptedException
+ {
+ if(deliveryMode == Session.AUTO_ACKNOWLEDGE
+ || deliveryMode == Session.DUPS_OK_ACKNOWLEDGE
+ || deliveryMode == org.apache.qpid.jms.Session.PRE_ACKNOWLEDGE
+ || deliveryMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
+ {
+ fail("Max Delivery feature is not supported with this acknowledgement mode" +
+ "when using synchronous message delivery.");
+ }
+
+ int _deliveryAttempts = 0; //number of times given message(s) have been seen
+ int _numMsgsToBeRedelivered = 0; //number of messages to rollback/recover
+ int _totalNumDeliveries = 0;
+ int _expectedMessage = 1;
+
+ while(!_failed)
+ {
+ Message message = consumer.receive(1000);
+
+ _totalNumDeliveries++;
+
+ if (message == null)
+ {
+ fail("Should not get null messages");
+ return;
+ }
+
+ try
+ {
+ int msgId = message.getIntProperty("count");
+
+ _logger.info("Received message: " + msgId);
+
+ //check the message is the one we expected
+ assertEquals("Unexpected message.", _expectedMessage, msgId);
+
+ _expectedMessage++;
+
+ //keep track of the overall deliveries to ensure we don't see more than expected
+ assertTrue("Exceeded expected total number of deliveries.",
+ _totalNumDeliveries <= expectedTotalNumberOfDeliveries );
+
+ //check if this message is one chosen to be rolled back / recovered
+ if(redeliverMsgs.contains(msgId))
+ {
+ //keep track of the number of messages we will have redelivered
+ //upon rollback/recover
+ _numMsgsToBeRedelivered++;
+
+ if(redeliverMsgs.contains(msgId +1))
+ {
+ //next message is going to be rolled back / recovered too.
+ //skip ahead to it
+ continue;
+ }
+
+ _deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen
+
+ switch(deliveryMode)
+ {
+ case Session.SESSION_TRANSACTED:
+ session.rollback();
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ session.recover();
+
+ //sleep then do a synchronous op to give the broker
+ //time to resend all the messages
+ Thread.sleep(500);
+ ((AMQSession) session).sync();
+ break;
+ }
+
+ if( _deliveryAttempts >= maxDeliveryCount)
+ {
+ //the client should have rejected the latest messages upon then
+ //above recover/rollback, adjust counts to compensate
+ _deliveryAttempts = 0;
+ }
+ else
+ {
+ //the message(s) should be redelivered, adjust expected message
+ _expectedMessage -= _numMsgsToBeRedelivered;
+ }
+
+ //As we just rolled back / recovered, we must reset the
+ //count of messages expected to be redelivered
+ _numMsgsToBeRedelivered = 0;
+ }
+ else
+ {
+ //consume the message
+ switch(deliveryMode)
+ {
+ case Session.SESSION_TRANSACTED:
+ session.commit();
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ message.acknowledge();
+ break;
+ }
+ }
+
+ if (msgId == MSG_COUNT)
+ {
+ //if this is the last message let the test complete.
+ assertTrue("Last message received, but we have not had the " +
+ "expected number of total delivieres",
+ expectedTotalNumberOfDeliveries == _totalNumDeliveries);
+
+ break;
+ }
+ }
+ catch (JMSException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ private boolean isDurSubTest()
+ {
+ return getTestQueueName().contains("DurableSubscription");
+ }
+}
diff --git a/java/test-profiles/CPPExcludes b/java/test-profiles/CPPExcludes
index da702f96cf..eb130b27b7 100755
--- a/java/test-profiles/CPPExcludes
+++ b/java/test-profiles/CPPExcludes
@@ -57,6 +57,9 @@ org.apache.qpid.server.queue.ConflationQueueTest#*
// the 0-10 c++ broker does not implement sorted queues
org.apache.qpid.server.queue.SortedQueueTest#*
+// the 0-10 c++ broker does not implement DLQ
+org.apache.qpid.test.unit.client.MaxDeliveryCountTest#*
+
//this test checks explicitly for 0-8 flow control semantics
org.apache.qpid.test.client.FlowControlTest#*