diff options
author | Keith Wall <kwall@apache.org> | 2011-11-28 09:19:15 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2011-11-28 09:19:15 +0000 |
commit | 4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd (patch) | |
tree | 8f5a5c8e728615f6442f9e317518817f15a3ee74 /java | |
parent | 907330f70818a437f7a0723743ab98b355d80d67 (diff) | |
download | qpid-python-4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd.tar.gz |
QPID-3642,QPID-3643: Add Dead Letter Queue functionality for 0-8/0-9/0-9-1 paths, fixes isBound methods on FanoutExchange
Applied patch from Keith Wall <keith.wall@gmail.com>, Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1207029 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
51 files changed, 2610 insertions, 172 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 76b6dad996..01a0d9900d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -20,8 +20,8 @@ package org.apache.qpid.server; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.Map; import javax.management.JMException; import javax.management.MBeanException; @@ -30,6 +30,7 @@ import javax.management.ObjectName; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; @@ -243,7 +244,13 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr */ public void createNewQueue(String queueName, String owner, boolean durable) throws JMException, MBeanException { - AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName)); + createNewQueue(queueName, owner, durable, null); + } + + public void createNewQueue(String queueName, String owner, boolean durable, Map<String,Object> arguments) throws JMException + { + final AMQShortString queueNameAsAMQShortString = new AMQShortString(queueName); + AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString); if (queue != null) { throw new JMException("The queue \"" + queueName + "\" already exists."); @@ -258,11 +265,18 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr ownerShortString = new AMQShortString(owner); } + FieldTable args = null; + if(arguments != null) + { + args = FieldTable.convertToFieldTable(arguments); + } final VirtualHost virtualHost = getVirtualHost(); - queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, false, virtualHost, null); + + queue = AMQQueueFactory.createAMQQueueImpl(queueNameAsAMQShortString, durable, ownerShortString, + false, false, getVirtualHost(), args); if (queue.isDurable() && !queue.isAutoDelete()) { - _durableConfig.createQueue(queue); + _durableConfig.createQueue(queue, args); } virtualHost.getBindingFactory().addBinding(queueName, queue, _defaultExchange, null); diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 34bc57a826..a4fd997568 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -22,7 +22,6 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQMethodBody; @@ -53,6 +52,7 @@ import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -63,6 +63,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.InboundMessageAdapter; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -692,6 +693,31 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } + public boolean isMaxDeliveryCountEnabled(final long deliveryTag) + { + final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); + if (queueEntry != null) + { + final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); + return maximumDeliveryCount > 0; + } + + return false; + } + + public boolean isDeliveredTooManyTimes(final long deliveryTag) + { + final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); + if (queueEntry != null) + { + final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); + final int numDeliveries = queueEntry.getDeliveryCount(); + return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount; + } + + return false; + } + /** * Called to resend all outstanding unacknowledged messages to this same channel. * @@ -739,9 +765,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel QueueEntry message = entry.getValue(); long deliveryTag = entry.getKey(); + //Amend the delivery counter as the client hasn't seen these messages yet. + message.decrementDeliveryCount(); - - ServerMessage msg = message.getMessage(); AMQQueue queue = message.getQueue(); // Our Java Client will always suspend the channel when resending! @@ -799,6 +825,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { QueueEntry message = entry.getValue(); long deliveryTag = entry.getKey(); + + //Amend the delivery counter as the client hasn't seen these messages yet. + message.decrementDeliveryCount(); + _unacknowledgedMessageMap.remove(deliveryTag); message.setRedelivered(); @@ -1058,6 +1088,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _session.registerMessageDelivered(entry.getMessage().getSize()); getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag()); + entry.incrementDeliveryCount(); } }; @@ -1246,7 +1277,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { private final Collection<QueueEntry> _ackedMessages; - public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages) { _ackedMessages = ackedMessages; @@ -1479,4 +1509,54 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } } } + + public void deadLetter(long deliveryTag) throws AMQException + { + final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); + final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag); + + if (rejectedQueueEntry == null) + { + _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag); + return; + } + else + { + final ServerMessage msg = rejectedQueueEntry.getMessage(); + + final AMQQueue queue = rejectedQueueEntry.getQueue(); + + final Exchange altExchange = queue.getAlternateExchange(); + unackedMap.remove(deliveryTag); + + if (altExchange == null) + { + _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); + _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); + rejectedQueueEntry.discard(); + return; + } + + final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry); + + final ArrayList<? extends BaseQueue> destinationQueues = altExchange.route(m); + + if (destinationQueues == null || destinationQueues.isEmpty()) + { + _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); + _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); + rejectedQueueEntry.discard(); + return; + } + + rejectedQueueEntry.routeToAlternate(); + + //output operational logging for each delivery post commit + for (final BaseQueue destinationQueue : destinationQueues) + { + _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString())); + } + + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 31c683b548..b8437c8430 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -63,7 +63,9 @@ public class QueueConfiguration extends ConfigurationPlugin "flowResumeCapacity", "lvq", "lvqKey", - "sortKey" + "sortKey", + "maximumDeliveryCount", + "deadLetterQueues" }; } @@ -173,6 +175,19 @@ public class QueueConfiguration extends ConfigurationPlugin return getStringValue("sortKey", null); } + public int getMaxDeliveryCount() + { + return getIntValue("maximumDeliveryCount", _vHostConfig.getMaxDeliveryCount()); + } + + /** + * Check if dead letter queue delivery is enabled, deferring to the virtualhost configuration if not set. + */ + public boolean isDeadLetterQueueEnabled() + { + return getBooleanValue("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled()); + } + public static class QueueConfig extends ConfigurationPlugin { @Override diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index e13e2f4d8f..4b42e39aa1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -40,6 +40,8 @@ import org.apache.commons.configuration.SystemConfiguration; import org.apache.commons.configuration.XMLConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.signal.SignalHandlerTask; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -810,4 +812,34 @@ public class ServerConfiguration extends ConfigurationPlugin { return getBooleanValue("management.managementRightsInferAllAccess", true); } + + public int getMaxDeliveryCount() + { + return getConfig().getInt("maximumDeliveryCount", 0); + } + + /** + * Check if dead letter queue delivery is enabled, defaults to disabled if not set. + */ + public boolean isDeadLetterQueueEnabled() + { + return getConfig().getBoolean("deadLetterQueues", false); + } + + /** + * String to affix to end of queue name when generating an alternate exchange for DLQ purposes. + */ + public String getDeadLetterExchangeSuffix() + { + return getConfig().getString("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + } + + /** + * String to affix to end of queue name when generating a queue for DLQ purposes. + */ + public String getDeadLetterQueueSuffix() + { + return getConfig().getString("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 6729a5ce0f..c4e4f701a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -347,4 +347,18 @@ public class VirtualHostConfiguration extends ConfigurationPlugin { return getLongValue("transactionTimeout.idleClose", 0L); } + + public int getMaxDeliveryCount() + { + return getIntValue("queues.maximumDeliveryCount", ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount()); + } + + /** + * Check if dead letter queue delivery is enabled, deferring to the broker configuration if not set. + */ + public boolean isDeadLetterQueueEnabled() + { + return getBooleanValue("queues.deadLetterQueues", ApplicationRegistry.getInstance().getConfiguration().isDeadLetterQueueEnabled()); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 7837a9bc38..102cd42ac3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public class DefaultExchangeFactory implements ExchangeFactory { private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class); + public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE"; private Map<AMQShortString, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends Exchange>>(); private final VirtualHost _host; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index bd75f7bc51..76f86ea1b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -117,7 +117,7 @@ public class FanoutExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) { - return _queues.contains(queue); + return _queues.containsKey(queue); } public boolean isBound(AMQShortString routingKey) @@ -129,7 +129,7 @@ public class FanoutExchange extends AbstractExchange public boolean isBound(AMQQueue queue) { - return _queues.contains(queue); + return _queues.containsKey(queue); } public boolean hasBindings() diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index ef9711004d..bbb009003c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -127,8 +127,6 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod() { - int _msg; - public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException { @@ -137,6 +135,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB { session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(), deliveryTag, queue.getMessageCount()); + entry.incrementDeliveryCount(); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index 62dd76f832..0ea88e4ab6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -59,7 +59,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { _logger.debug("Rejecting:" + body.getDeliveryTag() + ": Requeue:" + body.getRequeue() + - //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); } @@ -70,26 +69,23 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR if (message == null) { _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); -// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); } else { if (message.isQueueDeleted()) { - _logger.warn("Message's Queue as already been purged, unable to Reject. " + - "Dropping message should use Dead Letter Queue"); + _logger.warn("Message's Queue has already been purged, dropping message"); message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); if(message != null) { message.discard(); } - //sendtoDeadLetterQueue(msg) return; } if (message.getMessage() == null) { - _logger.warn("Message as already been purged, unable to Reject."); + _logger.warn("Message has already been purged, unable to Reject."); return; } @@ -98,27 +94,44 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + ": Requeue:" + body.getRequeue() + - //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); } - // If we haven't requested message to be resent to this consumer then reject it from ever getting it. - //if (!evt.getMethod().resend) - { - message.reject(); - } + message.reject(); if (body.getRequeue()) { channel.requeue(deliveryTag); + + //this requeue represents a message rejected from the pre-dispatch queue + //therefore we need to amend the delivery counter. + message.decrementDeliveryCount(); } else { - _logger.warn("Dropping message as requeue not required and there is no dead letter queue"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); - //sendtoDeadLetterQueue(AMQMessage message) -// message.queue = channel.getDefaultDeadLetterQueue(); -// channel.requeue(deliveryTag); + final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); + _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag); + if (maxDeliveryCountEnabled) + { + final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag); + _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag); + if (deliveredTooManyTimes) + { + channel.deadLetter(body.getDeliveryTag()); + } + else + { + //this requeue represents a message rejected because of a recover/rollback that we + //are not ready to DLQ. We rely on the reject command to resend from the unacked map + //and therefore need to increment the delivery counter so we cancel out the effect + //of the AMQChannel#resend() decrement. + message.incrementDeliveryCount(); + } + } + else + { + channel.deadLetter(body.getDeliveryTag()); + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 4643dee0a3..20ba3af458 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -72,7 +72,12 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod }; channel.rollback(task); - + + //Now resend all the unacknowledged messages back to the original subscribers. + //(Must be done after the TxnRollback-ok response). + // Why, are we not allowed to send messages back to client before the ok method? + channel.resend(false); + } catch (AMQException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties index ed8c0d0ce9..b5df212904 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties @@ -32,3 +32,7 @@ FLOW_REMOVED = CHN-1006 : Flow Control Removed # 0 - time in milliseconds OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms + +DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2} +DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1} +DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1} diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java index db32f13d8d..32bf8aa17d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.message; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; @@ -65,7 +66,6 @@ public class AMQMessage extends AbstractServerMessageImpl WeakReference<AMQChannel> _channelRef; - public AMQMessage(StoredMessage<MessageMetaData> handle) { this(handle, null); @@ -122,7 +122,15 @@ public class AMQMessage extends AbstractServerMessageImpl public String getRoutingKey() { - // TODO + MessageMetaData messageMetaData = getMessageMetaData(); + if (messageMetaData != null) + { + AMQShortString routingKey = messageMetaData.getMessagePublishInfo().getRoutingKey(); + if (routingKey != null) + { + return routingKey.asString(); + } + } return null; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 9140a13625..6dfdc5e8b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -213,6 +213,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer void setAlternateExchange(Exchange exchange); + void setAlternateExchange(String exchangeName); + Map<String, Object> getArguments(); void checkCapacity(AMQChannel channel); @@ -272,4 +274,22 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer ManagedObject getManagedObject(); void setExclusive(boolean exclusive) throws AMQException; + + /** + * Gets the maximum delivery count. If a message on this queue + * is delivered more than maximumDeliveryCount, the message will be + * routed to the {@link #getAlternateExchange()} (if set), or otherwise + * discarded. 0 indicates that maximum deliver count should not be enforced. + * + * @return maximum delivery count + */ + int getMaximumDeliveryCount(); + + /** + * Sets the maximum delivery count. + * + * @param maximumDeliveryCount maximum delivery count + */ + public void setMaximumDeliveryCount(final int maximumDeliveryCount); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index e4a6f01930..1b15bafb49 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -24,9 +24,15 @@ import java.util.HashMap; import java.util.Map; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; public class AMQQueueFactory @@ -37,6 +43,11 @@ public class AMQQueueFactory public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key"; + public static final String DLQ_ROUTING_KEY = "dlq"; + public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled"; + public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count"; + public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; + private abstract static class QueueProperty { @@ -80,6 +91,24 @@ public class AMQQueueFactory } + private abstract static class QueueIntegerProperty extends QueueProperty + { + public QueueIntegerProperty(String argumentName) + { + super(argumentName); + } + + public void setPropertyValue(AMQQueue queue, Object value) + { + if(value instanceof Number) + { + setPropertyValue(queue, ((Number)value).intValue()); + } + + } + abstract void setPropertyValue(AMQQueue queue, int value); + } + private static final QueueProperty[] DECLAREABLE_PROPERTIES = { new QueueLongProperty("x-qpid-maximum-message-age") { @@ -122,8 +151,14 @@ public class AMQQueueFactory { queue.setFlowResumeCapacity(value); } + }, + new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT) + { + public void setPropertyValue(AMQQueue queue, int value) + { + queue.setMaximumDeliveryCount(value); + } } - }; @@ -149,8 +184,13 @@ public class AMQQueueFactory String owner, boolean autoDelete, boolean exclusive, - VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException + VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException { + if (queueName == null) + { + throw new IllegalArgumentException("Queue name must not be null"); + } + // Access check if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner)) { @@ -158,6 +198,13 @@ public class AMQQueueFactory throw new AMQSecurityException(description); } + QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName); + boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration); + if (isDLQEnabled) + { + validateDLNames(queueName); + } + int priorities = 1; String conflationKey = null; String sortingKey = null; @@ -219,10 +266,63 @@ public class AMQQueueFactory } } - return q; + if(isDLQEnabled) + { + final String dlExchangeName = getDeadLetterExchangeName(queueName); + final String dlQueueName = getDeadLetterQueueName(queueName); - } + final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + final QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + Exchange dlExchange = null; + synchronized(exchangeRegistry) + { + dlExchange = exchangeRegistry.getExchange(dlExchangeName); + + if(dlExchange == null) + { + dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0); + + exchangeRegistry.registerExchange(dlExchange); + + //enter the dle in the persistent store + virtualHost.getDurableConfigurationStore().createExchange(dlExchange); + } + } + + AMQQueue dlQueue = null; + + synchronized(queueRegistry) + { + dlQueue = queueRegistry.getQueue(dlQueueName); + + if(dlQueue == null) + { + //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc + final Map<String, Object> args = new HashMap<String, Object>(); + args.put(X_QPID_DLQ_ENABLED, false); + args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0); + + dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args); + + //enter the dlq in the persistent store + virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args)); + } + } + + //ensure the queue is bound to the exchange + if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) + { + //actual routing key used does not matter due to use of fanout exchange, + //but we will make the key 'dlq' as it can be logged at creation. + virtualHost.getBindingFactory().addBinding(DLQ_ROUTING_KEY, dlQueue, dlExchange, null); + } + q.setAlternateExchange(dlExchange); + } + return q; + } public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException { @@ -250,10 +350,108 @@ public class AMQQueueFactory arguments = new HashMap<String,Object>(); arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey()); } + if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled()) + { + if (arguments == null) + { + arguments = new HashMap<String,Object>(); + } + arguments.put(X_QPID_DLQ_ENABLED, true); + } AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments); q.configure(config); return q; } + + /** + * Validates DLQ and DLE names + * <p> + * DLQ name and DLQ exchange name need to be validated in order to keep + * integrity in cases when queue name passes validation check but DLQ name + * or DL exchange name fails to pass it. Otherwise, we might have situations + * when queue is created but DL exchange or/and DLQ creation fail. + * <p> + * + * @param name + * queue name + * @throws IllegalArgumentException + * thrown if length of queue name or exchange name exceed 255 + */ + protected static void validateDLNames(String name) + { + // check if DLQ name and DLQ exchange name do not exceed 255 + String exchangeName = getDeadLetterExchangeName(name); + if (exchangeName.length() > AMQShortString.MAX_LENGTH) + { + throw new IllegalArgumentException("DL exchange name '" + exchangeName + + "' length exceeds limit of " + AMQShortString.MAX_LENGTH + " characters for queue " + name); + } + String queueName = getDeadLetterQueueName(name); + if (queueName.length() > AMQShortString.MAX_LENGTH) + { + throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of " + + AMQShortString.MAX_LENGTH + " characters for queue " + name); + } + } + + /** + * Checks if DLQ is enabled for the queue. + * + * @param autoDelete + * queue auto-delete flag + * @param arguments + * queue arguments + * @param qConfig + * queue configuration + * @return true if DLQ enabled + */ + protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig) + { + //feature is not to be enabled for temporary queues or when explicitly disabled by argument + if (!autoDelete) + { + boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED); + if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled()) + { + boolean dlqEnabled = true; + if (dlqArgumentPresent) + { + Object argument = arguments.get(X_QPID_DLQ_ENABLED); + dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue(); + } + return dlqEnabled; + } + } + return false; + } + + /** + * Generates a dead letter queue name for a given queue name + * + * @param name + * queue name + * @return DLQ name + */ + protected static String getDeadLetterQueueName(String name) + { + ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration(); + String dlQueueName = name + serverConfig.getDeadLetterQueueSuffix(); + return dlQueueName; + } + + /** + * Generates a dead letter exchange name for a given queue name + * + * @param name + * queue name + * @return DL exchange name + */ + protected static String getDeadLetterExchangeName(String name) + { + ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration(); + String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix(); + return dlExchangeName; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index d58d95c801..b4765d6227 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; @@ -80,7 +81,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private final String _queueName; // OpenMBean data types for viewMessages method - private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types. + private static OpenType[] _msgAttributeTypes = new OpenType[6]; // AMQ message attribute types. private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. @@ -139,6 +140,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que _msgAttributeTypes[2] = SimpleType.LONG; // For size _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered _msgAttributeTypes[4] = SimpleType.LONG; // For queue position + _msgAttributeTypes[5] = SimpleType.INTEGER; // For delivery count _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]), @@ -177,6 +179,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que return _queue.getMessageCount(); } + public Integer getMaximumDeliveryCount() + { + return _queue.getMaximumDeliveryCount(); + } + public Long getMaximumMessageSize() { return _queue.getMaximumMessageSize(); @@ -295,6 +302,18 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que } } + public void setAlternateExchange(String exchangeName) + { + _queue.setAlternateExchange(exchangeName); + } + + public String getAlternateExchange() + { + Exchange exchange = _queue.getAlternateExchange(); + String name = exchange == null ? null : exchange.getName(); + return name == null ? null : name; + } + /** * Checks if there is any notification to be send to the listeners */ @@ -472,7 +491,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list headerAttributes = getMessageHeaderProperties(headerBody); - itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position}; + itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()}; } else if(serverMsg instanceof MessageTransferMessage) { @@ -481,13 +500,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que // Create header attributes list headerAttributes = getMessageTransferMessageHeaderProps(msg); - itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position}; + itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()}; } else { //unknown message headerAttributes = new String[]{"N/A"}; - itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position}; + itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()}; } CompositeData messageData = new CompositeDataSupport(_messageDataType, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java index 77da08d8c4..26112d9f53 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java @@ -24,7 +24,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.AMQMessageHeader; -class InboundMessageAdapter implements InboundMessage +public class InboundMessageAdapter implements InboundMessage { private QueueEntry _entry; @@ -33,7 +33,7 @@ class InboundMessageAdapter implements InboundMessage { } - InboundMessageAdapter(QueueEntry entry) + public InboundMessageAdapter(QueueEntry entry) { _entry = entry; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index c1fb0258fa..37fad54c07 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -1,5 +1,7 @@ package org.apache.qpid.server.queue; +import java.util.Collection; + import org.apache.qpid.AMQException; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.message.ServerMessage; @@ -234,4 +236,16 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable * @return true if entry is either DEQUED or DELETED state */ boolean isDispensed(); + + /** + * Number of times this queue entry has been delivered. + * + * @return delivery count + */ + int getDeliveryCount(); + + void incrementDeliveryCount(); + + void decrementDeliveryCount(); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index ee1d214c1f..5bb5dc3462 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -31,10 +31,13 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -80,6 +83,12 @@ public abstract class QueueEntryImpl implements QueueEntry private volatile int _deliveryState; + /** Number of times this message has been delivered */ + private volatile int _deliveryCount = 0; + private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater + .newUpdater(QueueEntryImpl.class, "_deliveryCount"); + + public QueueEntryImpl(QueueEntryList<?> queueEntryList) { @@ -406,50 +415,51 @@ public abstract class QueueEntryImpl implements QueueEntry public void routeToAlternate() { final AMQQueue currentQueue = getQueue(); - Exchange alternateExchange = currentQueue.getAlternateExchange(); + Exchange alternateExchange = currentQueue.getAlternateExchange(); - if(alternateExchange != null) + if (alternateExchange != null) + { + final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this)); + final ServerMessage message = getMessage(); + if (rerouteQueues != null && rerouteQueues.size() != 0) { - final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this)); - final ServerMessage message = getMessage(); - if(rerouteQueues != null && rerouteQueues.size() != 0) - { - ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); - txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() { - public void postCommit() + ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); + + txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() + { + public void postCommit() + { + try { - try + for (BaseQueue queue : rerouteQueues) { - for(BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); + queue.enqueue(message); } } - - public void onRollback() + catch (AMQException e) { - + throw new RuntimeException(e); } - }); - txn.dequeue(currentQueue,message, - new ServerTransaction.Action() - { - public void postCommit() - { - discard(); - } - - public void onRollback() - { - - } - }); + } + + public void onRollback() + { + + } + }); + txn.dequeue(currentQueue, message, new ServerTransaction.Action() + { + public void postCommit() + { + discard(); + } + + public void onRollback() + { + + } + }); } } } @@ -524,4 +534,19 @@ public abstract class QueueEntryImpl implements QueueEntry return _state.isDispensed(); } + public int getDeliveryCount() + { + return _deliveryCount; + } + + public void incrementDeliveryCount() + { + _deliveryCountUpdater.incrementAndGet(this); + } + + public void decrementDeliveryCount() + { + _deliveryCountUpdater.decrementAndGet(this); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index ab47d89e01..7717c8ebfc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -187,7 +187,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private long _createTime = System.currentTimeMillis(); private ConfigurationPlugin _queueConfiguration; - + /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ + private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount(); protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) { @@ -356,6 +357,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _alternateExchange = exchange; } + public void setAlternateExchange(String exchangeName) + { + if(exchangeName == null || exchangeName.equals("")) + { + _alternateExchange = null; + return; + } + + Exchange exchange = getVirtualHost().getExchangeRegistry().getExchange(new AMQShortString(exchangeName)); + if (exchange == null) + { + throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost."); + } + setAlternateExchange(exchange); + } + public Map<String, Object> getArguments() { return _arguments; @@ -521,13 +538,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //Reconfigure the queue for to reflect this new binding. ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this); - if (_logger.isDebugEnabled()) - { - _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration); - } - if (config != null) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration); + } // Reconfigure with new config. configure(config); } @@ -2108,6 +2124,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize()); setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount()); setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap()); + setMaximumDeliveryCount(((QueueConfiguration)config).getMaxDeliveryCount()); _capacity = ((QueueConfiguration)config).getCapacity(); _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity(); } @@ -2229,4 +2246,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _logActor; } + + public int getMaximumDeliveryCount() + { + return _maximumDeliveryCount; + } + + public void setMaximumDeliveryCount(final int maximumDeliveryCount) + { + _maximumDeliveryCount = maximumDeliveryCount; + } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java index fdd533b704..7d128f2bc5 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java @@ -20,18 +20,31 @@ */ package org.apache.qpid.server; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.queue.AMQPriorityQueue; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; -public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase +public class AMQBrokerManagerMBeanTest extends QpidTestCase { private QueueRegistry _queueRegistry; private ExchangeRegistry _exchangeRegistry; @@ -95,14 +108,86 @@ public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase assertTrue("New queue should be bound to default exchange", defaultExchange.isBound(new AMQShortString(queueName))); } + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument does cause the + * maximum delivery count to be set on the Queue. + */ + public void testCreateNewQueueWithMaximumDeliveryCount() throws Exception + { + final Map<String,Object> args = new HashMap<String, Object>(); + args.put(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5); + + final AMQShortString queueName = new AMQShortString("testCreateNewQueueWithMaximumDeliveryCount"); + + final QueueRegistry qReg = _vHost.getQueueRegistry(); + + assertNull("The queue should not yet exist", qReg.getQueue(queueName)); + + final ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject()); + mbean.createNewQueue(queueName.asString(), "test", false, args); + + final AMQQueue createdQueue = qReg.getQueue(queueName); + assertNotNull("The queue was not registered as expected", createdQueue); + assertEquals("Unexpected maximum delivery count", 5, createdQueue.getMaximumDeliveryCount()); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_PRIORITIES} argument prompts creation of + * a Priority Queue. + */ + public void testCreatePriorityQueue() throws Exception + { + int numPriorities = 7; + Map<String,Object> args = new HashMap<String, Object>(); + args.put(AMQQueueFactory.X_QPID_PRIORITIES, numPriorities); + + AMQShortString queueName = new AMQShortString("testCreatePriorityQueue"); + + QueueRegistry qReg = _vHost.getQueueRegistry(); + + assertNull("The queue should not yet exist", qReg.getQueue(queueName)); + + ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject()); + mbean.createNewQueue(queueName.asString(), "test", false, args); + + AMQQueue queue = qReg.getQueue(queueName); + assertEquals("Queue is not a priorty queue", AMQPriorityQueue.class, queue.getClass()); + assertEquals("Number of priorities supported was not as expected", numPriorities, ((AMQPriorityQueue)queue).getPriorities()); + } + @Override public void setUp() throws Exception { super.setUp(); + + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + + XMLConfiguration configXml = new XMLConfiguration(); + configXml.addProperty("virtualhosts.virtualhost(-1).name", "test"); + configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName()); + + ServerConfiguration configuration = new ServerConfiguration(configXml); + + ApplicationRegistry registry = new TestApplicationRegistry(configuration); + ApplicationRegistry.initialise(registry); + registry.getVirtualHostRegistry().setDefaultVirtualHostName("test"); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test"); _queueRegistry = _vHost.getQueueRegistry(); _exchangeRegistry = _vHost.getExchangeRegistry(); } + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + ApplicationRegistry.remove(); + } + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java index 9941c00499..e1a5e7d338 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java @@ -24,6 +24,8 @@ import junit.framework.TestCase; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; public class QueueConfigurationTest extends TestCase { @@ -43,11 +45,71 @@ public class QueueConfigurationTest extends TestCase fullEnv.setProperty("queues.maximumMessageSize", 1); fullEnv.setProperty("queues.maximumMessageCount", 1); fullEnv.setProperty("queues.minimumAlertRepeatGap", 1); + fullEnv.setProperty("queues.deadLetterQueues", true); + fullEnv.setProperty("queues.maximumDeliveryCount", 5); _fullHostConf = new VirtualHostConfiguration("test", fullEnv); } + public void testMaxDeliveryCount() throws Exception + { + try + { + ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env)); + ApplicationRegistry.initialise(registry); + + // Check default value + QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); + assertEquals("Unexpected default server configuration for max delivery count ", 0, qConf.getMaxDeliveryCount()); + + // Check explicit value + VirtualHostConfiguration vhostConfig = overrideConfiguration("maximumDeliveryCount", 7); + qConf = new QueueConfiguration("test", vhostConfig); + assertEquals("Unexpected host configuration for max delivery count", 7, qConf.getMaxDeliveryCount()); + + // Check inherited value + qConf = new QueueConfiguration("test", _fullHostConf); + assertEquals("Unexpected queue configuration for max delivery count", 5, qConf.getMaxDeliveryCount()); + + } + finally + { + ApplicationRegistry.remove(); + } + } + + /** + * Tests that the default setting for DLQ configuration is disabled, and verifies that it can be overridden + * at a broker or virtualhost level. + * @throws Exception + */ + public void testIsDeadLetterQueueEnabled() throws Exception + { + try + { + ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env)); + ApplicationRegistry.initialise(registry); + + // Check default value + QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); + assertFalse("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled()); + + // Check explicit value + VirtualHostConfiguration vhostConfig = overrideConfiguration("deadLetterQueues", true); + qConf = new QueueConfiguration("test", vhostConfig); + assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled()); + + // Check inherited value + qConf = new QueueConfiguration("test", _fullHostConf); + assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled()); + } + finally + { + ApplicationRegistry.remove(); + } + } + public void testGetMaximumMessageAge() throws ConfigurationException { // Check default value diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 9ee2ed3812..7739f9976e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -25,6 +25,7 @@ import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.Writer; import java.util.Locale; import org.apache.commons.configuration.ConfigurationException; @@ -1459,4 +1460,117 @@ public class ServerConfigurationTest extends QpidTestCase ce.getMessage()); } } + + public void testMaxDeliveryCountDefault() throws Exception + { + final ServerConfiguration serverConfig = new ServerConfiguration(_config); + assertEquals(0, serverConfig.getMaxDeliveryCount()); + } + + public void testMaxDeliveryCount() throws Exception + { + _config.setProperty("maximumDeliveryCount", 5); + final ServerConfiguration serverConfig = new ServerConfiguration(_config); + assertEquals(5, serverConfig.getMaxDeliveryCount()); + } + + /** + * Test XML configuration file correctly enables dead letter queues + */ + public void testDeadLetterQueueConfigurationFile() throws Exception + { + // Write config + File xml = File.createTempFile(getClass().getName(), "xml"); + xml.deleteOnExit(); + FileWriter config = new FileWriter(xml); + config.write("<broker>\n"); + writeSecurity(config); + config.write("<deadLetterQueues>true</deadLetterQueues>\n"); + config.write("<virtualhosts>\n"); + config.write("<virtualhost>\n"); + config.write("<name>test</name>\n"); + config.write("<test>\n"); + config.write("<queues>\n"); + config.write("<deadLetterQueues>false</deadLetterQueues>\n"); + config.write("<queue>\n"); + config.write("<name>biggles</name>\n"); + config.write("<biggles>\n"); + config.write("<deadLetterQueues>true</deadLetterQueues>\n"); + config.write("</biggles>\n"); + config.write("</queue>\n"); + config.write("<queue>\n"); + config.write("<name>beetle</name>\n"); + config.write("<beetle />\n"); + config.write("</queue>\n"); + config.write("</queues>\n"); + config.write("</test>\n"); + config.write("</virtualhost>\n"); + config.write("<virtualhost>\n"); + config.write("<name>extra</name>\n"); + config.write("<extra>\n"); + config.write("<queues>\n"); + config.write("<queue>\n"); + config.write("<name>r2d2</name>\n"); + config.write("<r2d2>\n"); + config.write("<deadLetterQueues>false</deadLetterQueues>\n"); + config.write("</r2d2>\n"); + config.write("</queue>\n"); + config.write("<queue>\n"); + config.write("<name>c3p0</name>\n"); + config.write("<c3p0 />\n"); + config.write("</queue>\n"); + config.write("</queues>\n"); + config.write("</extra>\n"); + config.write("</virtualhost>\n"); + config.write("</virtualhosts>\n"); + config.write("</broker>\n"); + config.close(); + + // Load config + ApplicationRegistry.remove(); + ApplicationRegistry registry = new ConfigurationFileApplicationRegistry(xml); + ApplicationRegistry.initialise(registry); + ServerConfiguration serverConfiguration = ApplicationRegistry.getInstance().getConfiguration(); + + VirtualHostConfiguration test = serverConfiguration.getVirtualHostConfig("test"); + assertNotNull("Host 'test' is not found", test); + VirtualHostConfiguration extra = serverConfiguration.getVirtualHostConfig("extra"); + assertNotNull("Host 'extra' is not found", test); + + QueueConfiguration biggles = test.getQueueConfiguration("biggles"); + QueueConfiguration beetle = test.getQueueConfiguration("beetle"); + QueueConfiguration r2d2 = extra.getQueueConfiguration("r2d2"); + QueueConfiguration c3p0 = extra.getQueueConfiguration("c3p0"); + + // Validate config + assertTrue("Broker DLQ should be configured as enabled", serverConfiguration.isDeadLetterQueueEnabled()); + assertFalse("Test vhost DLQ should be configured as disabled", test.isDeadLetterQueueEnabled()); + assertTrue("Extra vhost DLQ should be enabled, using broker default", extra.isDeadLetterQueueEnabled()); + assertTrue("Biggles queue DLQ should be configured as enabled", biggles.isDeadLetterQueueEnabled()); + assertFalse("Beetle queue DLQ should be disabled, using test vhost default", beetle.isDeadLetterQueueEnabled()); + assertFalse("R2D2 queue DLQ should be configured as disabled", r2d2.isDeadLetterQueueEnabled()); + assertTrue("C3P0 queue DLQ should be enabled, using broker default", c3p0.isDeadLetterQueueEnabled()); + } + + /** + * Convenience method to output required security preamble for broker config + */ + private void writeSecurity(Writer out) throws Exception + { + out.write("\t<management><enabled>false</enabled></management>\n"); + out.write("\t<security>\n"); + out.write("\t\t<pd-auth-manager>\n"); + out.write("\t\t\t<principal-database>\n"); + out.write("\t\t\t\t<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>\n"); + out.write("\t\t\t\t<attributes>\n"); + out.write("\t\t\t\t\t<attribute>\n"); + out.write("\t\t\t\t\t\t<name>passwordFile</name>\n"); + out.write("\t\t\t\t\t\t<value>/dev/null</value>\n"); + out.write("\t\t\t\t\t</attribute>\n"); + out.write("\t\t\t\t</attributes>\n"); + out.write("\t\t\t</principal-database>\n"); + out.write("\t\t\t<jmx-access>/dev/null</jmx-access>\n"); + out.write("\t\t</pd-auth-manager>\n"); + out.write("\t</security>\n"); + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index b133d53ac5..f6cd397217 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; @@ -34,19 +33,6 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase { @Override - public void setUp() throws Exception - { - super.setUp(); - // Set the default configuration items - getConfigXml().clear(); - getConfigXml().addProperty("virtualhosts.virtualhost(-1).name", "test"); - getConfigXml().addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName()); - - getConfigXml().addProperty("virtualhosts.virtualhost.name", getName()); - getConfigXml().addProperty("virtualhosts.virtualhost."+getName()+".store.class", TestableMemoryMessageStore.class.getName()); - } - - @Override public void createBroker() { // Prevent auto broker startup @@ -134,6 +120,88 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase assertEquals(3, bTest.getMaximumMessageAge()); } + public void testMaxDeliveryCount() throws Exception + { + // Set up vhosts and queues + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.maximumDeliveryCount", 5); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.maximumDeliveryCount", 4); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle"); + + // Start the broker now. + super.createBroker(); + + // Get vhosts + VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName()); + + // Enabled specifically + assertEquals("Test vhost MDC was configured as enabled", 5 ,test.getConfiguration().getMaxDeliveryCount()); + + // Enabled by test vhost default + assertEquals("beetle queue DLQ was configured as enabled", test.getConfiguration().getMaxDeliveryCount(), test.getConfiguration().getQueueConfiguration("beetle").getMaxDeliveryCount()); + + // Disabled specifically + assertEquals("Biggles queue DLQ was configured as disabled", 4, test.getConfiguration().getQueueConfiguration("biggles").getMaxDeliveryCount()); + } + + /** + * Tests the full set of configuration options for enabling DLQs in the broker configuration. + */ + public void testIsDeadLetterQueueEnabled() throws Exception + { + // Set up vhosts and queues + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.deadLetterQueues", "true"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.deadLetterQueues", "false"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle"); + + + getConfigXml().addProperty("virtualhosts.virtualhost.name", getName() + "Extra"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName()); + + // Start the broker now. + super.createBroker(); + + // Get vhosts + VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName()); + VirtualHost extra = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName() + "Extra"); + + // Enabled specifically + assertTrue("Test vhost DLQ was configured as enabled", test.getConfiguration().isDeadLetterQueueEnabled()); + assertTrue("r2d2 queue DLQ was configured as enabled", extra.getConfiguration().getQueueConfiguration("r2d2").isDeadLetterQueueEnabled()); + + // Enabled by test vhost default + assertTrue("beetle queue DLQ was configured as enabled", test.getConfiguration().getQueueConfiguration("beetle").isDeadLetterQueueEnabled()); + + // Disabled specifically + assertFalse("Biggles queue DLQ was configured as disabled", test.getConfiguration().getQueueConfiguration("biggles").isDeadLetterQueueEnabled()); + + // Using broker default of disabled + assertFalse("Extra vhost DLQ disabled, using broker default", extra.getConfiguration().isDeadLetterQueueEnabled()); + assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled()); + + // Get queues + AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles")); + AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle")); + AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2")); + AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0")); + + // Disabled specifically for this queue, overriding virtualhost setting + assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange()); + + // Enabled for all queues on the virtualhost + assertNotNull("Beetle queue should have an alt exchange as DLQ should be enabled, using test vhost default", beetle.getAlternateExchange()); + + // Enabled specifically for this queue, overriding the default broker setting of disabled + assertNotNull("R2D2 queue should have an alt exchange as DLQ should be configured as enabled", r2d2.getAlternateExchange()); + + // Disabled by the default broker setting + assertNull("C3PO queue should not have an alt exchange as DLQ should be disabled, using broker default", c3p0.getAlternateExchange()); + } + /** * Test that the house keeping pool sizes is correctly processed * @@ -173,7 +241,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase vhost.getHouseKeepingTaskCount()); // Currently the two are tasks: - // ExpiredMessageTask from VirtualHost + // ExpiredMessageTask from VirtualHost // UpdateTask from the QMF ManagementExchange } @@ -214,7 +282,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase { getConfigXml().addProperty("virtualhosts.virtualhost.testSecurityAuthenticationNameRejected.security.authentication.name", "testdb"); - + try { super.createBroker(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 7b73987abf..ea2fe90da6 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -490,6 +490,22 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { return null; } + + @Override + public int getDeliveryCount() + { + return 0; + } + + @Override + public void incrementDeliveryCount() + { + } + + @Override + public void decrementDeliveryCount() + { + } }; if(action != null) diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 27891289fb..2b7d1d7f26 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,39 +20,76 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.InternalBrokerBaseCase; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; -public class AMQQueueFactoryTest extends InternalBrokerBaseCase +public class AMQQueueFactoryTest extends QpidTestCase { QueueRegistry _queueRegistry; VirtualHost _virtualHost; - int _defaultQueueCount; @Override public void setUp() throws Exception { super.setUp(); - ApplicationRegistry registry = (ApplicationRegistry) ApplicationRegistry.getInstance(); - _virtualHost = registry.getVirtualHostRegistry().getVirtualHost("test"); + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + + XMLConfiguration configXml = new XMLConfiguration(); + configXml.addProperty("virtualhosts.virtualhost(-1).name", getName()); + configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName()); + + ServerConfiguration configuration = new ServerConfiguration(configXml); + + ApplicationRegistry registry = new TestApplicationRegistry(configuration); + ApplicationRegistry.initialise(registry); + registry.getVirtualHostRegistry().setDefaultVirtualHostName(getName()); + + _virtualHost = registry.getVirtualHostRegistry().getVirtualHost(getName()); _queueRegistry = _virtualHost.getQueueRegistry(); - _defaultQueueCount = _queueRegistry.getQueues().size(); } @Override public void tearDown() throws Exception { - assertEquals("Queue was not registered in virtualhost", _defaultQueueCount + 1, _queueRegistry.getQueues().size()); - super.tearDown(); + try + { + super.tearDown(); + } + finally + { + ApplicationRegistry.remove(); + } + } + + private void verifyRegisteredQueueCount(int count) + { + assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size()); } + private void verifyQueueRegistered(String queueName) + { + assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName)); + } + public void testPriorityQueueRegistration() throws Exception { FieldTable fieldTable = new FieldTable(); @@ -63,13 +100,314 @@ public class AMQQueueFactoryTest extends InternalBrokerBaseCase false, _virtualHost, fieldTable); assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass()); + verifyQueueRegistered("testPriorityQueue"); + verifyRegisteredQueueCount(1); } public void testSimpleQueueRegistration() throws Exception { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false, + AMQShortString queueName = new AMQShortString("testSimpleQueueRegistration"); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, _virtualHost, null); assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); + verifyQueueRegistered("testSimpleQueueRegistration"); + + //verify that no alternate exchange or DLQ were produced + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + + assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange()); + assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName)); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does + * cause the alternate exchange to be set and DLQ to be produced. + * @throws AMQException + */ + public void testDeadLetterQueueEnabled() throws AMQException + { + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName)); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, fieldTable); + + Exchange altExchange = queue.getAlternateExchange(); + assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); + assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); + assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName()); + + assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName)); + assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName)); + + AMQQueue dlQueue = qReg.getQueue(dlQueueName); + assertNotNull("The DLQ was not registered as expected", dlQueue); + assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); + assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); + assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount()); + + //2 queues should have been registered + verifyRegisteredQueueCount(2); + } + + /** + * Tests that the deadLetterQueues/maximumDeliveryCount settings from the configuration + * are not applied to the DLQ itself. + * @throws AMQException + */ + public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws AMQException + { + ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("deadLetterQueues","true"); + ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("maximumDeliveryCount","5"); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName)); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, null); + + assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount()); + Exchange altExchange = queue.getAlternateExchange(); + assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); + assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); + assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName()); + + assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName)); + assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName)); + + AMQQueue dlQueue = qReg.getQueue(dlQueueName); + assertNotNull("The DLQ was not registered as expected", dlQueue); + assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); + assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); + assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount()); + + //2 queues should have been registered + verifyRegisteredQueueCount(2); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not + * result in the alternate exchange being set and DLQ being created. + * @throws AMQException + */ + public void testDeadLetterQueueDisabled() throws AMQException + { + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueDisabled"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName)); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, fieldTable); + + assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); + assertNull("The alternate exchange should still not exist", exReg.getExchange(dlExchangeName)); + + assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName)); + + //only 1 queue should have been registered + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but + * creating an auto-delete queue, does not result in the alternate exchange + * being set and DLQ being created. + * @throws AMQException + */ + public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException + { + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueNotCreatedForAutodeleteQueues"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName)); + + //create an autodelete queue + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), true, false, + _virtualHost, fieldTable); + assertTrue("Queue should be autodelete", queue.isAutoDelete()); + + //ensure that the autodelete property overrides the request to enable DLQ + assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange()); + assertNull("The alternate exchange should not exist as queue is autodelete", exReg.getExchange(dlExchangeName)); + assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName)); + + //only 1 queue should have been registered + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has + * the desired effect. + */ + public void testMaximumDeliveryCount() throws Exception + { + final FieldTable fieldTable = new FieldTable(); + fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5); + + final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount"); + + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, fieldTable); + + assertNotNull("The queue was not registered as expected ", queue); + assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount()); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means + * that queue is created with a default maximumDeliveryCount of zero (unless set in config). + */ + public void testMaximumDeliveryCountDefault() throws Exception + { + + final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount"); + + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, null); + + assertNotNull("The queue was not registered as expected ", queue); + assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount()); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests queue creation with queue name set to null + */ + public void testQueueNameNullValidation() + { + try + { + AMQQueueFactory.createAMQQueueImpl(null, false, new AMQShortString("owner"), true, false, _virtualHost, null); + fail("queue with null name can not be created!"); + } + catch (Exception e) + { + assertTrue(e instanceof IllegalArgumentException); + assertEquals("Queue name must not be null", e.getMessage()); + } + } + + /** + * Tests queue creation with queue name length less 255 characters but + * corresponding DLQ name length greater than 255. + */ + public void testQueueNameWithLengthLessThan255ButDLQNameWithLengthGreaterThan255() + { + String queueName = "test-" + generateStringWithLength('a', 245); + try + { + // change DLQ name to make its length bigger than exchange name + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", "_DLE"); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", "_DLQUEUE"); + + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"), + false, false, _virtualHost, fieldTable); + fail("queue with DLQ name having more than 255 characters can not be created!"); + } + catch (Exception e) + { + assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException); + assertTrue("Unexpected exception message!", e.getMessage().contains("DLQ queue name") + && e.getMessage().contains("length exceeds limit of 255")); + } + finally + { + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + } + + /** + * Tests queue creation with queue name length less 255 characters but + * corresponding DL exchange name length greater than 255. + */ + public void testQueueNameWithLengthLessThan255ButDLExchangeNameWithLengthGreaterThan255() + { + String queueName = "test-" + generateStringWithLength('a', 245); + try + { + // change DLQ name to make its length bigger than exchange name + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", "_DLEXCHANGE"); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", "_DLQ"); + + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"), + false, false, _virtualHost, fieldTable); + fail("queue with DLE name having more than 255 characters can not be created!"); + } + catch (Exception e) + { + assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException); + assertTrue("Unexpected exception message!", e.getMessage().contains("DL exchange name") + && e.getMessage().contains("length exceeds limit of 255")); + } + finally + { + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + } + + private String generateStringWithLength(char ch, int length) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) + { + sb.append(ch); + } + return sb.toString(); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 8f3023f269..f70250132a 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -44,6 +44,7 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.TabularData; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -390,6 +391,34 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertFalse(channel.getBlocking()); } + public void testMaximumDeliveryCount() throws IOException + { + assertEquals("Unexpected default maximum delivery count", Integer.valueOf(0), _queueMBean.getMaximumDeliveryCount()); + } + + public void testViewAllMessages() throws Exception + { + final int messageCount = 5; + sendPersistentMessages(messageCount); + + + final TabularData messageTable = _queueMBean.viewMessages(1L, 5L); + assertNotNull("Message table should not be null", messageTable); + assertEquals("Unexpected number of rows", messageCount, messageTable.size()); + + + final Iterator rowIterator = messageTable.values().iterator(); + // Get its message ID + final CompositeDataSupport row1 = (CompositeDataSupport) rowIterator.next(); + final Long msgId = (Long) row1.get("AMQ MessageId"); + final Long queuePosition = (Long) row1.get("Queue Position"); + final Integer deliveryCount = (Integer) row1.get("Delivery Count"); + + assertNotNull("Row should have value for queue position", queuePosition); + assertNotNull("Row should have value for msgid", msgId); + assertNotNull("Row should have value for deliveryCount", deliveryCount); + } + @Override public void setUp() throws Exception @@ -404,6 +433,13 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase ApplicationRegistry.remove(); } + private void sendPersistentMessages(int messageCount) throws AMQException + { + sendMessages(messageCount, true); + assertEquals("Expected " + messageCount + " messages in the queue", messageCount, _queueMBean + .getMessageCount().intValue()); + } + private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException { return sendMessages(messageCount, persistent, 0l, 0l); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 4c31092983..0daf79122c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -611,4 +611,21 @@ public class MockAMQQueue implements AMQQueue { } + + @Override + public int getMaximumDeliveryCount() + { + return 0; + } + + @Override + public void setMaximumDeliveryCount(int maximumDeliveryCount) + { + } + + @Override + public void setAlternateExchange(String exchangeName) + { + } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 864b9ad368..7ad002c248 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -244,4 +244,21 @@ public class MockQueueEntry implements QueueEntry return null; } + @Override + public int getDeliveryCount() + { + return 0; + } + + @Override + public void incrementDeliveryCount() + { + } + + @Override + public void decrementDeliveryCount() + { + } + + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index ad7885f195..6879fe0cfd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -284,7 +284,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); - _logger.debug("AMQP version " + amqpVersion); + if (_logger.isDebugEnabled()) + { + _logger.debug("AMQP version " + amqpVersion); + } _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); @@ -1485,4 +1488,5 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _lastFailoverTime; } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index f9a38138ba..1df809c67c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -75,6 +75,8 @@ public abstract class AMQDestination implements Destination, Referenceable private boolean _exchangeExistsChecked; + private RejectBehaviour _rejectBehaviour; + public static final int QUEUE_TYPE = 1; public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; @@ -227,6 +229,8 @@ public abstract class AMQDestination implements Destination, Referenceable _queueName = binding.getQueueName() == null ? null : binding.getQueueName(); _routingKey = binding.getRoutingKey() == null ? null : binding.getRoutingKey(); _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys(); + final String rejectBehaviourValue = binding.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR); + _rejectBehaviour = rejectBehaviourValue == null ? null : RejectBehaviour.valueOf(rejectBehaviourValue.toUpperCase()); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName) @@ -294,7 +298,7 @@ public abstract class AMQDestination implements Destination, Referenceable _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys; _destSyntax = DestSyntax.BURL; _browseOnly = browseOnly; - + _rejectBehaviour = null; if (_logger.isDebugEnabled()) { _logger.debug("Based on " + toString() + " the selected destination syntax is " + _destSyntax); @@ -499,6 +503,13 @@ public abstract class AMQDestination implements Destination, Referenceable sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); } + if (_rejectBehaviour != null) + { + sb.append(BindingURL.OPTION_REJECT_BEHAVIOUR); + sb.append("='" + _rejectBehaviour + "'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + //removeKey the last char '?' if there is no options , ',' if there are. sb.deleteCharAt(sb.length() - 1); url = sb.toString(); @@ -842,4 +853,19 @@ public abstract class AMQDestination implements Destination, Referenceable { return _addressResolved.get() > time; } + + /** + * This option is only applicable for 0-8/0-9/0-9-1 protocols connection + * <p> + * It tells the client to delegate the requeue/DLQ decision to the + * server .If this option is not specified, the messages won't be moved to + * the DLQ (or dropped) when delivery count exceeds the maximum. + * + * @return destination reject behaviour + */ + public RejectBehaviour getRejectBehaviour() + { + return _rejectBehaviour; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ef44221ec1..8984b7ca8c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -310,7 +310,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Holds the highest received delivery tag. */ protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); - + + /** Pre-fetched message tags */ + protected ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>(); + /** All the not yet acknowledged message tags */ protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); @@ -2925,11 +2928,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _producers.put(new Long(producerId), producer); } - private void rejectAllMessages(boolean requeue) - { - rejectMessagesForConsumerTag(0, requeue, true); - } - /** * @param consumerTag The consumerTag to prune from queue or all if null * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) @@ -3235,7 +3233,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic for (C consumer : _consumers.values()) { List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); - _unacknowledgedMessageTags.addAll(tags); + _prefetchedMessageTags.addAll(tags); } setConnectionStopped(isStopped); @@ -3345,7 +3343,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else if (_usingDispatcherForCleanup) { - _unacknowledgedMessageTags.add(deliveryTag); + _prefetchedMessageTags.add(deliveryTag); } else { @@ -3548,4 +3546,5 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.debug("Rollback mark is set to " + _rollbackMark.get()); } } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index e33410f5fe..96df463481 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; +import java.util.ArrayList; import java.util.Map; import javax.jms.Destination; @@ -40,7 +41,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; @@ -62,7 +62,6 @@ import org.apache.qpid.framing.ExchangeBoundOkBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.QueueBindOkBody; import org.apache.qpid.framing.QueueDeclareBody; @@ -223,6 +222,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B public void sendRecover() throws AMQException, FailoverException { + enforceRejectBehaviourDuringRecover(); + _prefetchedMessageTags.clear(); _unacknowledgedMessageTags.clear(); if (isStrictAMQP()) @@ -259,6 +260,49 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B } } + private void enforceRejectBehaviourDuringRecover() + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + _unacknowledgedMessageTags); + } + ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(_consumers.values()); + boolean messageListenerFound = false; + boolean serverRejectBehaviourFound = false; + for(BasicMessageConsumer_0_8 consumer : consumersToCheck) + { + if (consumer.isMessageListenerSet()) + { + messageListenerFound = true; + } + if (RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour())) + { + serverRejectBehaviourFound = true; + } + } + _logger.debug("about to pre-reject messages for " + consumersToCheck.size() + " consumer(s)"); + + if (serverRejectBehaviourFound) + { + //reject(false) any messages we don't want returned again + switch(_acknowledgeMode) + { + case Session.DUPS_OK_ACKNOWLEDGE: + case Session.AUTO_ACKNOWLEDGE: + if (!messageListenerFound) + { + break; + } + case Session.CLIENT_ACKNOWLEDGE: + for(Long tag : _unacknowledgedMessageTags) + { + rejectMessage(tag, false); + } + break; + } + } + } + public void releaseForRollback() { // Reject all the messages that have been received in this session and @@ -267,6 +311,17 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B // Otherwise messages will be able to arrive out of order to a second // consumer on the queue. Whilst this is within the JMS spec it is not // user friendly and avoidable. + boolean normalRejectBehaviour = true; + for (BasicMessageConsumer_0_8 consumer : _consumers.values()) + { + if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour())) + { + normalRejectBehaviour = false; + //no need to consult other consumers now, found server behaviour. + break; + } + } + while (true) { Long tag = _deliveredMessageTags.poll(); @@ -275,13 +330,14 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B break; } - rejectMessage(tag, true); + rejectMessage(tag, normalRejectBehaviour); } } public void rejectMessage(long deliveryTag, boolean requeue) { - if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)) + if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)|| + ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners())) { if (_logger.isDebugEnabled()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 7bb400fada..c6e5fbb019 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -147,7 +147,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa private List<StackTraceElement> _closedStack = null; - protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, @@ -211,6 +210,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); _arguments = ft; + } public AMQDestination getDestination() @@ -814,31 +814,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } } - - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - * - * @return the lastDeliveryTag to acknowledge - */ - Long getLastDelivered() - { - if (!_receivedDeliveryTags.isEmpty()) - { - Long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - return lastDeliveryTag; - } - - return null; - } - void notifyError(Throwable cause) { // synchronized (_closed) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index cf1d7cedeb..efcbfd5532 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -28,7 +28,10 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.*; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.url.BindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +39,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final RejectBehaviour _rejectBehaviour; + protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow, @@ -55,6 +60,25 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } + if (destination.getRejectBehaviour() != null) + { + _rejectBehaviour = destination.getRejectBehaviour(); + } + else + { + ConnectionURL connectionURL = connection.getConnectionURL(); + String rejectBehaviour = connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR); + if (rejectBehaviour != null) + { + _rejectBehaviour = RejectBehaviour.valueOf(rejectBehaviour.toUpperCase()); + } + else + { + // use the default value for all connections, if not set + rejectBehaviour = System.getProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.NORMAL.toString()); + _rejectBehaviour = RejectBehaviour.valueOf( rejectBehaviour.toUpperCase()); + } + } } void sendCancel() throws AMQException, FailoverException @@ -89,4 +113,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { } + + public RejectBehaviour getRejectBehaviour() + { + return _rejectBehaviour; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java b/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java new file mode 100644 index 0000000000..e3c958044e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +/** + * This enum can be used only with for 0-8/0-9/0-9-1 protocols connections to notify + * the client to delegate the requeue/DLQ decision to the server + * if <code>SERVER</server> value is specified. Otherwise the messages won't be moved to + * the DLQ (or dropped) when delivery count exceeds the maximum. + */ +public enum RejectBehaviour +{ + NORMAL, SERVER; +} diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 26641982d7..24d9360cfa 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -41,7 +41,16 @@ public interface ConnectionURL public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format"; public static final String OPTIONS_BROKERLIST = "brokerlist"; public static final String OPTIONS_FAILOVER = "failover"; - public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; + public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; + + /** + * This option is only applicable for 0-8/0-9/0-9-1 protocols connection + * <p> + * It tells the client to delegate the requeue/DLQ decision to the + * server .If this option is not specified, the messages won't be moved to + * the DLQ (or dropped) when delivery count exceeds the maximum. + */ + public static final String OPTIONS_REJECT_BEHAVIOUR = "rejectbehaviour"; public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange"; public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java new file mode 100644 index 0000000000..3a565f0f0d --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQInvalidArgumentException; + +public class AMQConnectionUnitTest extends TestCase +{ + + public void testExceptionReceived() + { + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'"; + AMQInvalidArgumentException expectedException = new AMQInvalidArgumentException("Test", null); + final AtomicReference<JMSException> receivedException = new AtomicReference<JMSException>(); + try + { + MockAMQConnection connection = new MockAMQConnection(url); + connection.setExceptionListener(new ExceptionListener() + { + + @Override + public void onException(JMSException jmsException) + { + receivedException.set(jmsException); + } + }); + connection.exceptionReceived(expectedException); + } + catch (Exception e) + { + fail("Failure to test exceptionRecived:" + e.getMessage()); + } + JMSException exception = receivedException.get(); + assertNotNull("Expected JMSException but got null", exception); + assertEquals("JMSException error code is incorrect", Integer.toString(expectedException.getErrorCode().getCode()), exception.getErrorCode()); + assertNotNull("Expected not null message for JMSException", exception.getMessage()); + assertTrue("JMSException error message is incorrect", exception.getMessage().contains(expectedException.getMessage())); + assertEquals("JMSException linked exception is incorrect", expectedException, exception.getLinkedException()); + } + +} diff --git a/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java b/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java new file mode 100644 index 0000000000..d8d94ba40e --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import javax.jms.Session; + +import org.apache.qpid.test.unit.message.TestAMQSession; +import org.apache.qpid.url.AMQBindingURL; + +import junit.framework.TestCase; + +public class BasicMessageConsumer_0_8_Test extends TestCase +{ + /** + * Test that if there is a value for Reject Behaviour specified for the Destination + * used to create the Consumer, it overrides the value for the Connection. + */ + public void testDestinationRejectBehaviourOverridesDefaultConnection() throws Exception + { + /* + * Check that when the connection does not have a value applied that this + * is successfully overridden with a specific value by the consumer. + */ + String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'"; + AMQConnection conn = new MockAMQConnection(connUrlString); + + String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'"; + AMQBindingURL burl = new AMQBindingURL(url); + AMQDestination queue = new AMQQueue(burl); + + AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn); + BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + + assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour()); + } + + /** + * Check that when the connection does have a specific value applied that this + * is successfully overridden with another specific value by the consumer. + */ + public void testDestinationRejectBehaviourSpecified() throws Exception + { + final String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='server'"; + final AMQConnection conn = new MockAMQConnection(connUrlString); + + final String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='normal'"; + final AMQBindingURL burl = new AMQBindingURL(url); + final AMQDestination queue = new AMQQueue(burl); + + final AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn); + final BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + + assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour()); + } + + /** + * Test that if no value for Reject Behaviour is applied to the Destination, then the value + * from the connection is used and acts as expected. + */ + public void testRejectBehaviourDetectedFromConnection() throws Exception + { + /* + * Check that when the connection does have a specific value applied that this + * is successfully detected by the consumer. + */ + String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='normal'"; + AMQConnection conn = new MockAMQConnection(connUrlString); + + String url = "exchangeClass://exchangeName/Destination/Queue"; + AMQBindingURL burl = new AMQBindingURL(url); + AMQDestination queue = new AMQQueue(burl); + + assertNull("Reject behaviour should have been null", queue.getRejectBehaviour()); + + AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn); + BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + + assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour()); + } + + + protected RejectBehaviour getRejectBehaviour(AMQDestination destination) + { + return destination.getRejectBehaviour(); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java index 73e67469ae..919809edc3 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java +++ b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java @@ -55,4 +55,9 @@ public class MockAMQConnection extends AMQConnection _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN); return null; } + + public AMQConnectionDelegate getDelegate() + { + return _delegate; + } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 4624b36fea..5a5a3a0bd9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -38,7 +38,7 @@ public class ConnectionURLTest extends TestCase ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod().equals("roundrobin")); - assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE)); + assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE)); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("bob")); assertTrue(connectionurl.getVirtualHost().equals("/test")); @@ -338,7 +338,7 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getPassword().equals("pass")); assertTrue(connectionurl.getVirtualHost().equals("/test")); assertTrue(connectionurl.getClientName().equals("client_id")); - + assertTrue(connectionurl.getBrokerCount() == 1); } @@ -457,7 +457,6 @@ public class ConnectionURLTest extends TestCase assertTrue(service.getTransport().equals("tcp")); - assertTrue(service.getHost().equals("localhost")); assertTrue(service.getPort() == 5672); assertEquals("jim",service.getProperty("foo")); @@ -468,7 +467,7 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getOption("timeout").equals("200")); assertTrue(connectionurl.getOption("immediatedelivery").equals("true")); } - + /** * Test that options other than failover and brokerlist are returned in the string representation. * <p> @@ -477,7 +476,7 @@ public class ConnectionURLTest extends TestCase public void testOptionToString() throws Exception { ConnectionURL url = new AMQConnectionURL("amqp://user:pass@temp/test?maxprefetch='12345'&brokerlist='tcp://localhost:5672'"); - + assertTrue("String representation should contain options and values", url.toString().contains("maxprefetch='12345'")); } @@ -493,10 +492,10 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getBrokerCount() == 1); BrokerDetails service = connectionurl.getBrokerDetails(0); - assertTrue(service.getTransport().equals("tcp")); + assertTrue(service.getTransport().equals("tcp")); assertTrue(service.getHost().equals("under_score")); assertTrue(service.getPort() == 6672); - + url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score'"; connectionurl = new AMQConnectionURL(url); @@ -507,11 +506,44 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getBrokerCount() == 1); service = connectionurl.getBrokerDetails(0); - assertTrue(service.getTransport().equals("tcp")); + assertTrue(service.getTransport().equals("tcp")); assertTrue(service.getHost().equals("under_score")); assertTrue(service.getPort() == 5672); } - + + + public void testRejectBehaviourPresent() throws Exception + { + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='server'"; + + ConnectionURL connectionURL = new AMQConnectionURL(url); + + assertTrue(connectionURL.getFailoverMethod() == null); + assertTrue(connectionURL.getUsername().equals("guest")); + assertTrue(connectionURL.getPassword().equals("guest")); + assertTrue(connectionURL.getVirtualHost().equals("/test")); + + //check that the reject behaviour option is returned as expected + assertEquals("Reject behaviour option was not as expected", "server", + connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR)); + } + + public void testRejectBehaviourNotPresent() throws URLSyntaxException + { + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'"; + + ConnectionURL connectionurl = new AMQConnectionURL(url); + + assertTrue(connectionurl.getFailoverMethod() == null); + assertTrue(connectionurl.getUsername().equals("guest")); + assertTrue(connectionurl.getPassword().equals("guest")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); + + //check that the reject behaviour option is null as expected + assertNull("Reject behaviour option was not as expected", + connectionurl.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR)); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(ConnectionURLTest.class); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java index 7de09cff45..2c32e4c559 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java @@ -22,8 +22,11 @@ package org.apache.qpid.test.unit.client.destinationurl; import junit.framework.TestCase; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.RejectBehaviour; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -190,6 +193,67 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getQueueName().equals("test:testQueueD")); } + public void testRejectBehaviourPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + assertTrue(burl.getExchangeClass().equals("exchangeClass")); + assertTrue(burl.getExchangeName().equals("exchangeName")); + assertTrue(burl.getDestinationName().equals("Destination")); + assertTrue(burl.getQueueName().equals("Queue")); + + //check that the MaxDeliveryCount property has the right value + assertEquals("server",burl.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR)); + + //check that the MaxDeliveryCount value is correctly returned from an AMQDestination + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertEquals("Reject behaviour is unexpected", RejectBehaviour.SERVER, dest.getRejectBehaviour()); + } + + public void testRejectBehaviourNotPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + + assertTrue(burl.getExchangeClass().equals("exchangeClass")); + assertTrue(burl.getExchangeName().equals("exchangeName")); + assertTrue(burl.getDestinationName().equals("Destination")); + assertTrue(burl.getQueueName().equals("Queue")); + + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertNull("Reject behaviour is unexpected", dest.getRejectBehaviour()); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(DestinationURLTest.class); diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 88e2fb0176..a36e7c214e 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -108,6 +108,14 @@ public class ClientProperties public static final String QPID_TCP_NODELAY_PROP_NAME = "qpid.tcp_nodelay"; public static final String AMQJ_TCP_NODELAY_PROP_NAME = "amqj.tcp_nodelay"; + /** + * System property to set the reject behaviour. default value will be 'normal' but can be + * changed to 'server' in which case the server decides whether a message should be requeued + * or dead lettered. + * This can be overridden by the more specific settings at connection or binding URL level. + */ + public static final String REJECT_BEHAVIOUR_PROP_NAME = "qpid.reject.behaviour"; + /* public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME = QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID"); diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java index 9996fff311..0e6c865a16 100644 --- a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -37,6 +37,15 @@ public interface BindingURL public static final String OPTION_ROUTING_KEY = "routingkey"; public static final String OPTION_BINDING_KEY = "bindingkey"; + /** + * This option is only applicable for 0-8/0-9/0-9-1 protocols connection + * <p> + * It tells the client to delegate the requeue/DLQ decision to the + * server .If this option is not specified, the messages won't be moved to + * the DLQ (or dropped) when delivery count exceeds the maximum. + */ + public static final String OPTION_REJECT_BEHAVIOUR = "rejectbehaviour"; + String getURL(); diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java index b5c80a4fed..b74342df1f 100644 --- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java +++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java @@ -23,6 +23,7 @@ package org.apache.qpid.management.common.mbeans; import java.io.IOException; import java.util.List; +import java.util.Map; import javax.management.JMException; import javax.management.MBeanException; @@ -118,6 +119,24 @@ public interface ManagedBroker throws IOException, JMException, MBeanException; /** + * Create a new Queue in the VirtualHost + * + * @since Qpid JMX API 2.4 + * @param queueName name of the new queue + * @param durable true if the queue should be durable + * @param owner owner + * @param arguments declaration arguments for use when creating the queue, may be null. + * @throws IOException + * @throws JMException + */ + @MBeanOperation(name="createNewQueue", description="Create a new Queue in the VirtualHost", impact= MBeanOperationInfo.ACTION) + void createNewQueue(@MBeanOperationParameter(name="queue name", description="Name of the new queue")String queueName, + @MBeanOperationParameter(name="owner", description="Owner name")String owner, + @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable, + @MBeanOperationParameter(name="arguments", description="Map of arguments")Map<String,Object> arguments) + throws IOException, JMException; + + /** * Unregisters the Queue bindings, removes the subscriptions and unregisters * from the managed objects. * diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java index be31d8ef88..c23a0f5076 100644 --- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java +++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java @@ -50,7 +50,8 @@ public interface ManagedQueue String MSG_SIZE = "Size(bytes)"; String MSG_REDELIVERED = "Redelivered"; String MSG_QUEUE_POS = "Queue Position"; - List<String> VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC = Collections.unmodifiableList(Arrays.asList(MSG_AMQ_ID, MSG_HEADER, MSG_SIZE, MSG_REDELIVERED, MSG_QUEUE_POS)); + String MSG_DELIVERY_COUNT = "Delivery Count"; + List<String> VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC = Collections.unmodifiableList(Arrays.asList(MSG_AMQ_ID, MSG_HEADER, MSG_SIZE, MSG_REDELIVERED, MSG_QUEUE_POS, MSG_DELIVERY_COUNT)); List<String> VIEW_MSGS_TABULAR_UNIQUE_INDEX = Collections.unmodifiableList(Arrays.asList(MSG_QUEUE_POS)); //CompositeType key/description information for message content @@ -67,6 +68,7 @@ public interface ManagedQueue static final String ATTR_MAX_MSG_COUNT = "MaximumMessageCount"; static final String ATTR_MAX_QUEUE_DEPTH = "MaximumQueueDepth"; static final String ATTR_MAX_MSG_SIZE = "MaximumMessageSize"; + static final String ATTR_MAXIMUM_DELIVERY_COUNT = "MaximumDeliveryCount"; static final String ATTR_DURABLE = "Durable"; static final String ATTR_AUTODELETE = "AutoDelete"; static final String ATTR_CONSUMER_COUNT = "ConsumerCount"; @@ -78,7 +80,8 @@ public interface ManagedQueue static final String ATTR_FLOW_OVERFULL = "FlowOverfull"; static final String ATTR_FLOW_RESUME_CAPACITY = "FlowResumeCapacity"; static final String ATTR_EXCLUSIVE = "Exclusive"; - + static final String ATTR_ALT_EXCHANGE = "AlternateExchange"; + //All attribute names constant static final List<String> QUEUE_ATTRIBUTES = Collections.unmodifiableList( @@ -91,6 +94,7 @@ public interface ManagedQueue ATTR_MAX_MSG_COUNT, ATTR_MAX_QUEUE_DEPTH, ATTR_MAX_MSG_SIZE, + ATTR_MAXIMUM_DELIVERY_COUNT, ATTR_DURABLE, ATTR_AUTODELETE, ATTR_CONSUMER_COUNT, @@ -101,7 +105,9 @@ public interface ManagedQueue ATTR_CAPACITY, ATTR_FLOW_OVERFULL, ATTR_FLOW_RESUME_CAPACITY, - ATTR_EXCLUSIVE)))); + ATTR_EXCLUSIVE, + ATTR_ALT_EXCHANGE + )))); /** * Returns the Name of the ManagedQueue. @@ -120,6 +126,16 @@ public interface ManagedQueue Integer getMessageCount() throws IOException; /** + * Maximum number of times a message is permitted to be delivered or zero if not enforced. + * + * @since Qpid JMX API 2.4 + * @return maximum delivery count + * @throws IOException + */ + @MBeanAttribute(name="MaximumDeliveryCount", description = "Maximum number of times a message is permitted to be delivered or zero if not enforced") + Integer getMaximumDeliveryCount() throws IOException; + + /** * Tells the total number of messages receieved by the queue since startup. * @return total number of messages received. * @throws IOException @@ -309,7 +325,7 @@ public interface ManagedQueue /** * Sets whether the queue is exclusive or not. - * + * * @since Qpid JMX API 2.0 * @param exclusive the capacity in bytes * @throws IOException @@ -318,6 +334,25 @@ public interface ManagedQueue @MBeanAttribute(name="Exclusive", description="Whether the queue is Exclusive or not") void setExclusive(boolean exclusive) throws IOException, JMException; + /** + * Sets the Alternate Exchange for the queue, for use in dead letter queue functionality. + * + * @since Qpid JMX API 2.4 + * @param the name of the exchange to use. Specifying null or the empty string will clear the alternate exchange. + * @throws IOException + */ + void setAlternateExchange(String exchangeName) throws IOException; + + /** + * Returns the name of the Alternate Exchange for the queue, or null if there isn't one. + * + * @since Qpid JMX API 2.4 + * @return the name of the Alternate Exchange for the queue, or null if there isn't one + * @throws IOException + */ + @MBeanAttribute(name="AlternateExchange", description="Alternate exchange for the queue") + String getAlternateExchange() throws IOException; + //********** Operations *****************// diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java index 12ae69571e..9d40edd8d0 100644 --- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java +++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java @@ -47,7 +47,7 @@ public interface ServerInformation * Qpid JMX API 1.1 can be assumed. */ int QPID_JMX_API_MAJOR_VERSION = 2; - int QPID_JMX_API_MINOR_VERSION = 3; + int QPID_JMX_API_MINOR_VERSION = 4; /** diff --git a/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java b/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java index 1a4a73f207..c931b921df 100644 --- a/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java +++ b/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java @@ -23,7 +23,6 @@ package org.apache.qpid.management.common.mbeans; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; -import java.util.ArrayList; import java.util.List; import javax.management.MBeanAttributeInfo; diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java index 4a59176374..faa6769c63 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java @@ -47,7 +47,7 @@ public abstract class ApplicationRegistry //max supported broker management interface supported by this release of the management console public static final int SUPPORTED_QPID_JMX_API_MAJOR_VERSION = 2; - public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 3; + public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 4; public static final String DATA_DIR = System.getProperty("user.home") + File.separator + ".qpidmc"; diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java index 390a7b55e4..97ba9afc32 100644 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java @@ -18,6 +18,9 @@ */ package org.apache.qpid.management.jmx; +import java.util.Collections; +import java.util.Map; + import javax.management.JMException; import javax.management.MBeanException; import javax.management.ObjectName; @@ -25,6 +28,8 @@ import javax.management.ObjectName; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -136,4 +141,26 @@ public class ManagedBrokerMBeanTest extends QpidBrokerTestCase final ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(defaultExchangeName); assertNotNull("Exchange should exist", defaultExchange); } + + /** + * Tests queue creation with {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests + * that the attribute is exposed correctly through {@link ManagedQueue#getMaximumDeliveryCount()}. + */ + public void testCreateQueueWithMaximumDeliveryCountSet() throws Exception + { + final String queueName = getName(); + final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + + final Integer deliveryCount = 1; + final Map<String, Object> args = Collections.singletonMap(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount); + managedBroker.createNewQueue(queueName, "testowner", true, args); + + // Ensure the queue exists + assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName("test", queueName)); + assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals("Unexpected maximum delivery count", deliveryCount, managedQueue.getMaximumDeliveryCount()); + } + } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java new file mode 100644 index 0000000000..e59dac8c01 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java @@ -0,0 +1,695 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Test that the MaxRedelivery feature works as expected, allowing the client to reject + * messages during rollback/recover whilst specifying they not be requeued if delivery + * to an application has been attempted a specified number of times. + * + * General approach: specify a set of messages which will cause the test client to then + * deliberately rollback/recover the session after consuming, and monitor that they are + * re-delivered the specified number of times before the client rejects them without requeue + * and then verify that they are not subsequently redelivered. + * + * Additionally, the queue used in the test is configured for DLQ'ing, and the test verifies + * that the messages rejected without requeue are then present on the appropriate DLQ. + */ +public class MaxDeliveryCountTest extends QpidBrokerTestCase +{ + private static final Logger _logger = Logger.getLogger(MaxDeliveryCountTest.class); + private boolean _failed; + private String _failMsg; + private static final int MSG_COUNT = 15; + private static final int MAX_DELIVERY_COUNT = 2; + private CountDownLatch _awaitCompletion; + + public void setUp() throws Exception + { + //enable DLQ/maximumDeliveryCount support for all queues at the vhost level + setConfigurationProperty("virtualhosts.virtualhost.test.queues.maximumDeliveryCount", + String.valueOf(MAX_DELIVERY_COUNT)); + setConfigurationProperty("virtualhosts.virtualhost.test.queues.deadLetterQueues", + String.valueOf(true)); + + //Ensure management is on + setConfigurationProperty("management.enabled", "true"); + setConfigurationProperty("management.ssl.enabled", "false"); + + // Set client-side flag to allow the server to determine if messages + // dead-lettered or requeued. + setTestClientSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, "server"); + + super.setUp(); + + boolean durableSub = isDurSubTest(); + + //declare the test queue + Connection consumerConnection = getConnection(); + Session consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Destination destination = getDestination(consumerSession, durableSub); + if(durableSub) + { + consumerSession.createDurableSubscriber((Topic)destination, getName()).close(); + } + else + { + consumerSession.createConsumer(destination).close(); + } + + consumerConnection.close(); + + //Create Producer put some messages on the queue + Connection producerConnection = getConnection(); + producerConnection.start(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(getDestination(producerSession, durableSub)); + + for (int count = 1; count <= MSG_COUNT; count++) + { + Message msg = producerSession.createTextMessage(generateContent(count)); + msg.setIntProperty("count", count); + producer.send(msg); + } + + producerConnection.close(); + + _failed = false; + _awaitCompletion = new CountDownLatch(1); + } + + private Destination getDestination(Session consumerSession, boolean durableSub) throws JMSException + { + if(durableSub) + { + return consumerSession.createTopic(getTestQueueName()); + } + else + { + return consumerSession.createQueue(getTestQueueName()); + } + } + + private String generateContent(int count) + { + return "Message " + count + " content."; + } + + /** + * Test that Max Redelivery is enforced when using onMessage() on a + * Client-Ack session. + */ + public void testAsynchronousClientAckSession() throws Exception + { + final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); + redeliverMsgs.add(1); + redeliverMsgs.add(2); + redeliverMsgs.add(5); + redeliverMsgs.add(14); + + doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, false, false); + } + + /** + * Test that Max Redelivery is enforced when using onMessage() on a + * transacted session. + */ + public void testAsynchronousTransactedSession() throws Exception + { + final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); + redeliverMsgs.add(1); + redeliverMsgs.add(2); + redeliverMsgs.add(5); + redeliverMsgs.add(14); + + doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, false); + } + + /** + * Test that Max Redelivery is enforced when using onMessage() on an + * Auto-Ack session. + */ + public void testAsynchronousAutoAckSession() throws Exception + { + final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); + redeliverMsgs.add(1); + redeliverMsgs.add(2); + redeliverMsgs.add(5); + redeliverMsgs.add(14); + + doTest(Session.AUTO_ACKNOWLEDGE, redeliverMsgs, false, false); + } + + /** + * Test that Max Redelivery is enforced when using onMessage() on a + * Dups-OK session. + */ + public void testAsynchronousDupsOkSession() throws Exception + { + final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); + redeliverMsgs.add(1); + redeliverMsgs.add(2); + redeliverMsgs.add(5); + redeliverMsgs.add(14); + + doTest(Session.DUPS_OK_ACKNOWLEDGE, redeliverMsgs, false, false); + } + + /** + * Test that Max Redelivery is enforced when using recieve() on a + * Client-Ack session. + */ + public void testSynchronousClientAckSession() throws Exception + { + final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); + redeliverMsgs.add(1); + redeliverMsgs.add(2); + redeliverMsgs.add(3); + redeliverMsgs.add(14); + + doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, true, false); + } + + /** + * Test that Max Redelivery is enforced when using recieve() on a + * transacted session. + */ + public void testSynchronousTransactedSession() throws Exception + { + final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); + redeliverMsgs.add(1); + redeliverMsgs.add(2); + redeliverMsgs.add(5); + redeliverMsgs.add(14); + + doTest(Session.SESSION_TRANSACTED, redeliverMsgs, true, false); + } + + public void testDurableSubscription() throws Exception + { + final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); + redeliverMsgs.add(1); + redeliverMsgs.add(2); + redeliverMsgs.add(5); + redeliverMsgs.add(14); + + doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, true); + } + + public void doTest(final int deliveryMode, final ArrayList<Integer> redeliverMsgs, final boolean synchronous, final boolean durableSub) throws Exception + { + final Connection clientConnection = getConnection(); + + final boolean transacted = deliveryMode == Session.SESSION_TRANSACTED ? true : false; + final Session clientSession = clientConnection.createSession(transacted, deliveryMode); + + MessageConsumer consumer; + Destination dest = getDestination(clientSession, durableSub); + AMQQueue checkQueue; + if(durableSub) + { + consumer = clientSession.createDurableSubscriber((Topic)dest, getName()); + checkQueue = new AMQQueue("amq.topic", "clientid" + ":" + getName()); + } + else + { + consumer = clientSession.createConsumer(dest); + checkQueue = (AMQQueue) dest; + } + + assertEquals("The queue should have " + MSG_COUNT + " msgs at start", + MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue)); + + clientConnection.start(); + + int expectedDeliveries = MSG_COUNT + ((MAX_DELIVERY_COUNT -1) * redeliverMsgs.size()); + + if(synchronous) + { + doSynchronousTest(clientSession, consumer, clientSession.getAcknowledgeMode(), + MAX_DELIVERY_COUNT, expectedDeliveries, redeliverMsgs); + } + else + { + addMessageListener(clientSession, consumer, clientSession.getAcknowledgeMode(), + MAX_DELIVERY_COUNT, expectedDeliveries, redeliverMsgs); + + try + { + if (!_awaitCompletion.await(20, TimeUnit.SECONDS)) + { + fail("Test did not complete in 20 seconds."); + } + } + catch (InterruptedException e) + { + fail("Unable to wait for test completion"); + throw e; + } + + if(_failed) + { + fail(_failMsg); + } + } + consumer.close(); + + //check the source queue is now empty + assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue)); + + //check the DLQ has the required number of rejected-without-requeue messages + verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub); + + if(isBrokerStorePersistent()) + { + //restart the broker to verify persistence of the DLQ and the messages on it + clientConnection.close(); + + restartBroker(); + + final Connection clientConnection2 = getConnection(); + final Session clientSession2 = clientConnection2.createSession(transacted, deliveryMode); + clientConnection2.start(); + + //verify the messages on the DLQ + verifyDLQcontent(clientConnection2, redeliverMsgs, getTestQueueName(), durableSub); + clientConnection2.close(); + } + else + { + + //verify the messages on the DLQ + verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub); + clientConnection.close(); + } + + } + + private void verifyDLQdepth(int expected, Session clientSession, boolean durableSub) throws AMQException + { + AMQDestination checkQueueDLQ; + if(durableSub) + { + checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + else + { + checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + + assertEquals("The DLQ should have " + expected + " msgs on it", expected, + ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueueDLQ)); + } + + private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException + { + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer; + if(durableSub) + { + if (isBroker010()) + { + consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX)); + } + else + { + consumer = clientSession.createDurableSubscriber(clientSession.createTopic(destName), getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + } + else + { + consumer = clientSession.createConsumer( + clientSession.createQueue(destName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX)); + } + + //keep track of the message we expect to still be on the DLQ + List<Integer> outstandingMessages = new ArrayList<Integer>(redeliverMsgs); + int numMsg = outstandingMessages.size(); + + for(int i = 0; i < numMsg; i++) + { + Message message = consumer.receive(250); + + assertNotNull("failed to consume expected message " + i + " from DLQ", message); + assertTrue("message " + i + " was the wrong type", message instanceof TextMessage); + + //using Integer here to allow removing the value from the list, using int + //would instead result in removal of the element at that index + Integer msgId = message.getIntProperty("count"); + + TextMessage txt = (TextMessage) message; + _logger.info("Received message " + msgId + " at " + i + " from the DLQ: " + txt.getText()); + + assertTrue("message " + i + " was not one of those which should have been on the DLQ", + redeliverMsgs.contains(msgId)); + assertTrue("message " + i + " was not one of those expected to still be on the DLQ", + outstandingMessages.contains(msgId)); + assertEquals("Message " + i + " content was not as expected", generateContent(msgId), txt.getText()); + + //remove from the list of outstanding msgs + outstandingMessages.remove(msgId); + } + + if(outstandingMessages.size() > 0) + { + String failures = ""; + for(Integer msg : outstandingMessages) + { + failures = failures.concat(msg + " "); + } + fail("some DLQ'd messages were not found on the DLQ: " + failures); + } + } + + private void addMessageListener(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount, + final int expectedTotalNumberOfDeliveries, final ArrayList<Integer> redeliverMsgs) throws JMSException + { + if(deliveryMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE + || deliveryMode == org.apache.qpid.jms.Session.PRE_ACKNOWLEDGE) + { + failAsyncTest("Max Delivery feature is not supported with this acknowledgement mode" + + "when using asynchronous message delivery."); + } + + consumer.setMessageListener(new MessageListener() + { + private int _deliveryAttempts = 0; //number of times given message(s) have been seen + private int _numMsgsToBeRedelivered = 0; //number of messages to rollback/recover + private int _totalNumDeliveries = 0; + private int _expectedMessage = 1; + + public void onMessage(Message message) + { + if(_failed || _awaitCompletion.getCount() == 0L) + { + //don't process anything else + return; + } + + _totalNumDeliveries++; + + if (message == null) + { + failAsyncTest("Should not get null messages"); + return; + } + + try + { + int msgId = message.getIntProperty("count"); + + _logger.info("Received message: " + msgId); + + //check the message is the one we expected + if(_expectedMessage != msgId) + { + failAsyncTest("Expected message " + _expectedMessage + " , got message " + msgId); + return; + } + + _expectedMessage++; + + //keep track of the overall deliveries to ensure we don't see more than expected + if(_totalNumDeliveries > expectedTotalNumberOfDeliveries) + { + failAsyncTest("Expected total of " + expectedTotalNumberOfDeliveries + + " message deliveries, reached " + _totalNumDeliveries); + } + + //check if this message is one chosen to be rolled back / recovered + if(redeliverMsgs.contains(msgId)) + { + _numMsgsToBeRedelivered++; + + //check if next message is going to be rolled back / recovered too + if(redeliverMsgs.contains(msgId +1)) + { + switch(deliveryMode) + { + case Session.SESSION_TRANSACTED: + //skip on to next message immediately + return; + case Session.CLIENT_ACKNOWLEDGE: + //skip on to next message immediately + return; + case Session.DUPS_OK_ACKNOWLEDGE: + //fall through + case Session.AUTO_ACKNOWLEDGE: + //must recover session now or onMessage will ack, so + //just fall through the if + break; + } + } + + _deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen + + _logger.debug("ROLLBACK/RECOVER"); + switch(deliveryMode) + { + case Session.SESSION_TRANSACTED: + session.rollback(); + break; + case Session.CLIENT_ACKNOWLEDGE: + //fall through + case Session.DUPS_OK_ACKNOWLEDGE: + //fall through + case Session.AUTO_ACKNOWLEDGE: + session.recover(); + break; + } + + if( _deliveryAttempts >= maxDeliveryCount) + { + //the client should have rejected the latest messages upon then + //above recover/rollback, adjust counts to compensate + _deliveryAttempts = 0; + } + else + { + //the message(s) should be redelivered, adjust expected message + _expectedMessage -= _numMsgsToBeRedelivered; + } + _logger.debug("XXX _expectedMessage: " + _expectedMessage + " _deliveryAttempts : " + _deliveryAttempts + " _numMsgsToBeRedelivered=" + _numMsgsToBeRedelivered); + //reset count of messages expected to be redelivered + _numMsgsToBeRedelivered = 0; + } + else + { + //consume the message + switch(deliveryMode) + { + case Session.SESSION_TRANSACTED: + session.commit(); + break; + case Session.CLIENT_ACKNOWLEDGE: + message.acknowledge(); + break; + case Session.DUPS_OK_ACKNOWLEDGE: + //fall-through + case Session.AUTO_ACKNOWLEDGE: + //do nothing, onMessage will ack on exit. + break; + } + } + + if (msgId == MSG_COUNT) + { + //if this is the last message let the test complete. + if (expectedTotalNumberOfDeliveries == _totalNumDeliveries) + { + _awaitCompletion.countDown(); + } + else + { + failAsyncTest("Last message received, but we have not had the " + + "expected number of total delivieres. Received " + _totalNumDeliveries + " Expecting : " + expectedTotalNumberOfDeliveries); + } + } + } + catch (JMSException e) + { + failAsyncTest(e.getMessage()); + } + } + }); + } + + private void failAsyncTest(String msg) + { + _logger.error("Failing test because: " + msg); + _failMsg = msg; + _failed = true; + _awaitCompletion.countDown(); + } + + private void doSynchronousTest(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount, + final int expectedTotalNumberOfDeliveries, final ArrayList<Integer> redeliverMsgs) throws JMSException, AMQException, InterruptedException + { + if(deliveryMode == Session.AUTO_ACKNOWLEDGE + || deliveryMode == Session.DUPS_OK_ACKNOWLEDGE + || deliveryMode == org.apache.qpid.jms.Session.PRE_ACKNOWLEDGE + || deliveryMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE) + { + fail("Max Delivery feature is not supported with this acknowledgement mode" + + "when using synchronous message delivery."); + } + + int _deliveryAttempts = 0; //number of times given message(s) have been seen + int _numMsgsToBeRedelivered = 0; //number of messages to rollback/recover + int _totalNumDeliveries = 0; + int _expectedMessage = 1; + + while(!_failed) + { + Message message = consumer.receive(1000); + + _totalNumDeliveries++; + + if (message == null) + { + fail("Should not get null messages"); + return; + } + + try + { + int msgId = message.getIntProperty("count"); + + _logger.info("Received message: " + msgId); + + //check the message is the one we expected + assertEquals("Unexpected message.", _expectedMessage, msgId); + + _expectedMessage++; + + //keep track of the overall deliveries to ensure we don't see more than expected + assertTrue("Exceeded expected total number of deliveries.", + _totalNumDeliveries <= expectedTotalNumberOfDeliveries ); + + //check if this message is one chosen to be rolled back / recovered + if(redeliverMsgs.contains(msgId)) + { + //keep track of the number of messages we will have redelivered + //upon rollback/recover + _numMsgsToBeRedelivered++; + + if(redeliverMsgs.contains(msgId +1)) + { + //next message is going to be rolled back / recovered too. + //skip ahead to it + continue; + } + + _deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen + + switch(deliveryMode) + { + case Session.SESSION_TRANSACTED: + session.rollback(); + break; + case Session.CLIENT_ACKNOWLEDGE: + session.recover(); + + //sleep then do a synchronous op to give the broker + //time to resend all the messages + Thread.sleep(500); + ((AMQSession) session).sync(); + break; + } + + if( _deliveryAttempts >= maxDeliveryCount) + { + //the client should have rejected the latest messages upon then + //above recover/rollback, adjust counts to compensate + _deliveryAttempts = 0; + } + else + { + //the message(s) should be redelivered, adjust expected message + _expectedMessage -= _numMsgsToBeRedelivered; + } + + //As we just rolled back / recovered, we must reset the + //count of messages expected to be redelivered + _numMsgsToBeRedelivered = 0; + } + else + { + //consume the message + switch(deliveryMode) + { + case Session.SESSION_TRANSACTED: + session.commit(); + break; + case Session.CLIENT_ACKNOWLEDGE: + message.acknowledge(); + break; + } + } + + if (msgId == MSG_COUNT) + { + //if this is the last message let the test complete. + assertTrue("Last message received, but we have not had the " + + "expected number of total delivieres", + expectedTotalNumberOfDeliveries == _totalNumDeliveries); + + break; + } + } + catch (JMSException e) + { + fail(e.getMessage()); + } + } + } + + private boolean isDurSubTest() + { + return getTestQueueName().contains("DurableSubscription"); + } +} diff --git a/java/test-profiles/CPPExcludes b/java/test-profiles/CPPExcludes index da702f96cf..eb130b27b7 100755 --- a/java/test-profiles/CPPExcludes +++ b/java/test-profiles/CPPExcludes @@ -57,6 +57,9 @@ org.apache.qpid.server.queue.ConflationQueueTest#* // the 0-10 c++ broker does not implement sorted queues org.apache.qpid.server.queue.SortedQueueTest#* +// the 0-10 c++ broker does not implement DLQ +org.apache.qpid.test.unit.client.MaxDeliveryCountTest#* + //this test checks explicitly for 0-8 flow control semantics org.apache.qpid.test.client.FlowControlTest#* |