diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-12-07 12:24:44 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-12-07 12:24:44 +0000 |
| commit | e4ea6e8536ac8dea773ade50cc2872ef6ac4f298 (patch) | |
| tree | 531822db00ccb8cc5b699aeb9947542dfbc016ff /qpid/java | |
| parent | 141971d214792d5ec8dd3a8b4aa1a5d1ae7a77e9 (diff) | |
| download | qpid-python-e4ea6e8536ac8dea773ade50cc2872ef6ac4f298.tar.gz | |
QPID-2973: broker support for rejecting messages without requeue, and creating+using DLQs
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1042999 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
27 files changed, 441 insertions, 73 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java index 78ae09426d..c07fa5af1d 100644 --- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java +++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java @@ -34,7 +34,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.AbstractExchange; -import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.junit.extensions.util.SizeOf; @@ -204,7 +204,7 @@ public class DiagnosticExchange extends AbstractExchange return false; } - public void route(IncomingMessage payload) throws AMQException + public void route(InboundMessage payload) throws AMQException { Long value = new Long(SizeOf.getUsedMemory()); diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java index e43bd2ddc0..21e05fd981 100644 --- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java +++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java @@ -28,7 +28,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -102,7 +102,7 @@ public class TestExchange implements Exchange { } - public void route(IncomingMessage message) throws AMQException + public void route(InboundMessage message) throws AMQException { } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 129d40d530..b45c26d22d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; @@ -44,12 +45,14 @@ import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.*; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.DLQTransactionalContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -1092,4 +1095,75 @@ public class AMQChannel } } } + + public void deadLetter(long deliveryTag) throws AMQException + { + UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); + QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag); + + if (rejectedQueueEntry == null) + { + _log.warn("No message found, unable to DLQ delivery tag: " + deliveryTag); + return; + } + else + { + AMQMessage msg = rejectedQueueEntry.getMessage(); + + AMQQueue queue = rejectedQueueEntry.getQueue(); + Exchange altExchange = queue.getAlternateExchange(); + + //TODO:remove below line, its temporary for some noddy testing only +// altExchange = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry().getExchange(new AMQShortString("dle.test")); + if (altExchange == null) + { + _log.warn("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); + rejectedQueueEntry.discard(new StoreContext()); + return; + } + + InboundMessageAdapter adapter = new InboundMessageAdapter(msg); + altExchange.route(adapter); + + ArrayList<AMQQueue> destinationQueues = adapter.getEnqueuedList(); + if (destinationQueues == null || destinationQueues.isEmpty()) + { + _log.warn("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); + rejectedQueueEntry.discard(new StoreContext()); + return; + } + + //increment the message reference count to include the new queue(s) + msg.incrementReference(destinationQueues.size()); + + //create a new storeContext to use with a new TransactionContext for the DLQ process + StoreContext dlqStoreContext = new StoreContext("Session: " + _session.getClientIdentifier() + "; channel: " + _channelId + "; DLQ deliveryTag: " + deliveryTag); + DLQTransactionalContext dlqTxnContext = new DLQTransactionalContext(this, dlqStoreContext); + + //enqueue the message on the new queues in the store if its persistent + if (msg.isPersistent()) + { + MessageStore store = getMessageStore(); + + for (int i = 0; i < destinationQueues.size(); i++) + { + store.enqueueMessage(dlqStoreContext, destinationQueues.get(i), msg.getMessageId()); + } + } + + //TODO: ensure the AMQMessage used is NOT marked IMMEDIATE, to prevent it not being enqueued + + //configure the txn context to ack consumption from old queue upon commit + unackedMap.acknowledgeMessage(deliveryTag, false, dlqTxnContext); + + //configure the txn context to deliver to the new queues following commit + for (int i = 0; i < destinationQueues.size(); i++) + { + dlqTxnContext.deliver(destinationQueues.get(i), msg); + } + + dlqTxnContext.commit(); + + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java new file mode 100644 index 0000000000..0d27637aec --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import java.util.ArrayList; +import java.util.Set; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.InboundMessage; +import org.apache.qpid.server.queue.QueueEntry; + +public class InboundMessageAdapter implements InboundMessage +{ + private AMQMessage _message; + private ArrayList<AMQQueue> _queues; + + public InboundMessageAdapter(AMQMessage msg) + { + _message = msg; + } + + public Long getMessageId() + { + return _message.getMessageId(); + } + + public boolean isPersistent() + { + return _message.isPersistent(); + } + + public boolean isRedelivered() + { + return _message.isRedelivered(); + } + + public AMQShortString getRoutingKey() throws AMQException + { + return _message.getRoutingKey(); + } + + public ContentHeaderBody getContentHeaderBody() + { + try + { + return _message.getContentHeaderBody(); + } + catch (AMQException e) + { + throw new RuntimeException("Error retrieving ContentHeaderBody: " + e, e); + } + } + + public void enqueue(ArrayList<AMQQueue> queues) + { + _queues = queues; + } + + public ArrayList<AMQQueue> getEnqueuedList() + { + return _queues; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 8e00539025..a87bef0990 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -36,6 +36,8 @@ import org.apache.commons.configuration.HierarchicalConfiguration; import org.apache.commons.configuration.SystemConfiguration; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -694,4 +696,20 @@ public class ServerConfiguration implements SignalHandler { return getConfig().getBoolean("statistics.reporting.reset", false); } + + /** + * String to affix to end of queue name when generating an alternate exchange for DLQ purposes. + */ + public String getDeadLetterExchangeSuffix() + { + return getConfig().getString("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + } + + /** + * String to affix to end of queue name when generating a queue for DLQ purposes. + */ + public String getDeadLetterQueueSuffix() + { + return getConfig().getString("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 620799a81f..b8affcfb53 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public class DefaultExchangeFactory implements ExchangeFactory { private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class); + public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE"; private Map<AMQShortString, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends Exchange>>(); private final VirtualHost _host; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 7f80971a00..a985ad6b6f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -40,7 +40,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -227,7 +227,7 @@ public class DirectExchange extends AbstractExchange } } - public void route(IncomingMessage payload) throws AMQException + public void route(InboundMessage payload) throws AMQException { final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 06209c5458..8b74c9f6e9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.queue.InboundMessage; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -54,7 +55,7 @@ public interface Exchange void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; - void route(IncomingMessage message) throws AMQException; + void route(InboundMessage message) throws AMQException; /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index f69a741dec..17ea60c524 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -28,7 +28,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -227,7 +227,7 @@ public class FanoutExchange extends AbstractExchange } } - public void route(IncomingMessage payload) throws AMQException + public void route(InboundMessage payload) throws AMQException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 3c3204a59e..8f918e557f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -33,6 +33,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.ManagementActor; +import org.apache.qpid.server.queue.InboundMessage; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -286,7 +287,7 @@ public class HeadersExchange extends AbstractExchange } } - public void route(IncomingMessage payload) throws AMQException + public void route(InboundMessage payload) throws AMQException { FieldTable headers = getHeaders(payload.getContentHeaderBody()); if (_logger.isDebugEnabled()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 56f774c9ae..fd37cf188d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -30,6 +30,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortStringTokenizer; +import org.apache.qpid.server.queue.InboundMessage; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -265,7 +266,7 @@ public class TopicExchange extends AbstractExchange _filteredQueues.put(queue,newFilters); } - public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues) + public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues) { if(queues == null) { @@ -501,7 +502,7 @@ public class TopicExchange extends AbstractExchange if(selectorRef == null || (selector = selectorRef.get())==null) { - selector = new JMSSelectorFilter<RuntimeException>(selectorString); + selector = new JMSSelectorFilter(selectorString); _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector)); } return selector; @@ -563,7 +564,7 @@ public class TopicExchange extends AbstractExchange return normalizedString; } - public void route(IncomingMessage payload) throws AMQException + public void route(InboundMessage payload) throws AMQException { final AMQShortString routingKey = payload.getRoutingKey(); @@ -681,7 +682,7 @@ public class TopicExchange extends AbstractExchange } } - private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey) + private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey) { Collection<TopicMatcherResult> results = _parser.parse(routingKey); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index fcf3fd4337..09961cb711 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -59,7 +59,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { _logger.debug("Rejecting:" + body.getDeliveryTag() + ": Requeue:" + body.getRequeue() + - //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); } @@ -70,26 +69,23 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR if (message == null) { _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); -// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); } else { if (message.isQueueDeleted()) { - _logger.warn("Message's Queue as already been purged, unable to Reject. " + - "Dropping message should use Dead Letter Queue"); + _logger.warn("Message's Queue has already been purged, dropping message"); message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); if(message != null) { message.discard(channel.getStoreContext()); } - //sendtoDeadLetterQueue(msg) return; } if (!message.getMessage().isReferenced()) { - _logger.warn("Message as already been purged, unable to Reject."); + _logger.warn("Message has already been purged, unable to Reject."); return; } @@ -98,15 +94,10 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() + ": Requeue:" + body.getRequeue() + - //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); } - // If we haven't requested message to be resent to this consumer then reject it from ever getting it. - //if (!evt.getMethod().resend) - { - message.reject(); - } + message.reject(); if (body.getRequeue()) { @@ -114,11 +105,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR } else { - _logger.warn("Dropping message as requeue not required and there is no dead letter queue"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); - //sendtoDeadLetterQueue(AMQMessage message) -// message.queue = channel.getDefaultDeadLetterQueue(); -// channel.requeue(deliveryTag); + channel.deadLetter(body.getDeliveryTag()); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index b987dae16d..3e7092cd56 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -41,7 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * A deliverable message. */ -public class AMQMessage implements Filterable<AMQException> +public class AMQMessage implements Filterable { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -273,7 +274,10 @@ public class AMQMessage implements Filterable<AMQException> return _messageHandle.getContentHeaderBody(getStoreContext()); } - + public AMQShortString getRoutingKey() throws AMQException + { + return getMessagePublishInfo().getRoutingKey(); + } public Long getMessageId() { @@ -373,7 +377,7 @@ public class AMQMessage implements Filterable<AMQException> return (_flags & DELIVERED_TO_CONSUMER) != 0; } - public boolean isPersistent() throws AMQException + public boolean isPersistent() { return _messageHandle.isPersistent(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index f169decbb9..b92e7db02e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -95,7 +95,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; - void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; + void requeue(QueueEntry entry) throws AMQException; void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException; @@ -248,4 +248,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void configure(QueueConfiguration config); ManagedObject getManagedObject(); + + Exchange getAlternateExchange(); + + void setAlternateExchange(Exchange exchange); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index c6dcb1cd01..5f5c6fe1a4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -21,17 +21,23 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.Map; -import java.util.HashMap; - - public class AMQQueueFactory { + public static final boolean CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED = true;//TODO: take from queue configuration + public static final AMQShortString DLQ_ROUTING_KEY = new AMQShortString("dlq"); + public static final AMQShortString X_QPID_DLQ_ENABLED = new AMQShortString("x-qpid-dlq-enabled"); + public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); private static final AMQShortString QPID_LAST_VALUE_QUEUE = new AMQShortString ("qpid.last_value_queue"); private static final AMQShortString QPID_LAST_VALUE_QUEUE_KEY = new AMQShortString("qpid.last_value_queue_key"); @@ -147,7 +153,6 @@ public class AMQQueueFactory } } - AMQQueue q = null; if(conflationKey != null) { @@ -164,7 +169,8 @@ public class AMQQueueFactory //Register the new queue virtualHost.getQueueRegistry().registerQueue(q); - q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString())); + QueueConfiguration qConfig = virtualHost.getConfiguration().getQueueConfiguration(name.asString()); + q.configure(qConfig); if(arguments != null) { @@ -177,6 +183,74 @@ public class AMQQueueFactory } } + boolean dlqArgPresent = (arguments != null && (arguments.containsKey(X_QPID_DLQ_ENABLED))); + + if(dlqArgPresent || CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED) + { + //verify that the argument isn't explicitly disabling DLQ for this queue. + boolean dlqEnabled = true; + if(dlqArgPresent) + { + dlqEnabled = arguments.getBoolean(X_QPID_DLQ_ENABLED); + } + + //feature is not to be enabled for temporary queues or when explicitly disabled by argument + if(!q.isAutoDelete() && dlqEnabled) + { + ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration(); + AMQShortString dlExchangeName = new AMQShortString(name + serverConfig.getDeadLetterExchangeSuffix()); + AMQShortString dlQueueName = new AMQShortString(name + serverConfig.getDeadLetterQueueSuffix()); + + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + Exchange dlExchange = null; + synchronized(exchangeRegistry) + { + dlExchange = exchangeRegistry.getExchange(dlExchangeName); + + if(dlExchange == null) + { + dlExchange = exchangeFactory.createExchange(dlExchangeName, + ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0); + + exchangeRegistry.registerExchange(dlExchange); + + //enter the dle in the persistent store + virtualHost.getMessageStore().createExchange(dlExchange); + } + } + + AMQQueue dlQueue = null; + synchronized(queueRegistry) + { + dlQueue = queueRegistry.getQueue(dlQueueName); + + if(dlQueue == null) + { + //set args to disable DLQ'ing from the DLQ itself, preventing loops etc + FieldTable args = new FieldTable(); + args.setBoolean(X_QPID_DLQ_ENABLED, false); + + dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, virtualHost, args); + + //enter the dlq in the persistent store + virtualHost.getMessageStore().createQueue(dlQueue, args); + } + } + + //ensure the queue is bound to the exchange + if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) + { + dlQueue.bind(dlExchange, DLQ_ROUTING_KEY, null); + } + + q.setAlternateExchange(dlExchange); + } + + } + return q; } @@ -212,6 +286,15 @@ public class AMQQueueFactory arguments.setString(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey()); } + if (!config.getAutoDelete() && CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED) + { + if(arguments == null) + { + arguments = new FieldTable(); + } + + arguments.setBoolean(X_QPID_DLQ_ENABLED, true); + } AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments); q.configure(config); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java new file mode 100644 index 0000000000..00381fb218 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.ArrayList; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; + +public interface InboundMessage extends Filterable<RuntimeException> +{ + AMQShortString getRoutingKey() throws AMQException; + + Long getMessageId(); + + void enqueue(ArrayList<AMQQueue> queues); +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index d5e0b4d187..519e8bf50c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -35,9 +35,8 @@ import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import java.util.ArrayList; -import java.util.Collection; -public class IncomingMessage implements Filterable<RuntimeException> +public class IncomingMessage implements InboundMessage { /** Used for debugging purposes. */ @@ -279,7 +278,6 @@ public class IncomingMessage implements Filterable<RuntimeException> return _contentHeaderBody; } - public boolean isPersistent() { return getContentHeaderBody().properties instanceof BasicContentHeaderProperties && diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 5ba8957abb..84f7d00847 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -171,7 +171,7 @@ public interface QueueEntry extends Comparable<QueueEntry> boolean isRejectedBy(Subscription subscription); - void requeue(StoreContext storeContext) throws AMQException; + void requeue() throws AMQException; void dequeue(final StoreContext storeContext) throws FailedDequeueException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 7332e66f99..3f785c7aec 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -260,9 +260,9 @@ public class QueueEntryImpl implements QueueEntry } - public void requeue(final StoreContext storeContext) throws AMQException + public void requeue() throws AMQException { - getQueue().requeue(storeContext, this); + getQueue().requeue(this); if(_stateChangeListeners != null) { notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index ba5357ca01..03ff2c48b9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -129,6 +129,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); private final AtomicBoolean _overfull = new AtomicBoolean(false); + private Exchange _alternateExchange; + protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { @@ -554,7 +556,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException + public void requeue(QueueEntry entry) throws AMQException { SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); @@ -1812,4 +1814,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return String.valueOf(getName()); } + + public Exchange getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(Exchange exchange) + { + _alternateExchange = exchange; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java new file mode 100644 index 0000000000..78350d9eda --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.StoreContext; + +/** A transactional context that only supports local transactions + * for use in DeadLetterQueue processing */ +public class DLQTransactionalContext extends LocalTransactionalContext +{ + private final StoreContext _storeContext; + + public DLQTransactionalContext(final AMQChannel channel, final StoreContext storeContext) + { + super(channel); + _storeContext = storeContext; + } + + @Override + public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException + { + //TODO: ensure message is not Immediate. Copy the message object if necessary. + deliver(queue, message, true); + } + + @Override + public StoreContext getStoreContext() + { + return _storeContext; + } +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index c20ce138f3..f487774c65 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -76,7 +76,7 @@ public class LocalTransactionalContext implements TransactionalContext public void process() throws AMQException { - entry.requeue(getStoreContext()); + entry.requeue(); } } @@ -84,11 +84,13 @@ public class LocalTransactionalContext implements TransactionalContext { private final AMQQueue _queue; private final AMQMessage _message; + private final boolean _enqueueOnly; - public PublishAction(final AMQQueue queue, final AMQMessage message) + public PublishAction(final AMQQueue queue, final AMQMessage message, boolean enqueueOnly) { _queue = queue; _message = message; + _enqueueOnly = enqueueOnly; } public void process() throws AMQException @@ -98,11 +100,14 @@ public class LocalTransactionalContext implements TransactionalContext try { QueueEntry entry = _queue.enqueue(getStoreContext(),_message); - _queue.checkCapacity(_channel); - - if(entry.immediateAndNotDelivered()) + if(!_enqueueOnly) { - getReturnMessages().add(new NoConsumersException(_message)); + _queue.checkCapacity(_channel); + + if(entry.immediateAndNotDelivered()) + { + getReturnMessages().add(new NoConsumersException(_message)); + } } } finally @@ -151,24 +156,26 @@ public class LocalTransactionalContext implements TransactionalContext public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException { + deliver(queue, message, false); + } + + protected void deliver(final AMQQueue queue, AMQMessage message, boolean enqueueOnly) throws AMQException + { // A publication will result in the enlisting of several // TxnOps. The first is an op that will store the message. // Following that (and ordering is important), an op will // be added for every queue onto which the message is // enqueued. - _postCommitDeliveryList.add(new PublishAction(queue, message)); + _postCommitDeliveryList.add(new PublishAction(queue, message, enqueueOnly)); _messageDelivered = true; - } public void requeue(QueueEntry entry) throws AMQException { _postCommitDeliveryList.add(new RequeueAction(entry)); _messageDelivered = true; - } - private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException { if (!unacknowledgedMessageMap.contains(deliveryTag)) @@ -255,10 +262,10 @@ public class LocalTransactionalContext implements TransactionalContext if (_ackOp != null) { - + //there are unacknowledged messages to commit delivery of to the client _messageDelivered = true; _ackOp.consolidate(); - // already enlisted, after commit will reset regardless of outcome + // ackOp has already enlisted in the txnBuffer, after commit will reset regardless of outcome _ackOp = null; } @@ -293,7 +300,7 @@ public class LocalTransactionalContext implements TransactionalContext { if (_log.isDebugEnabled()) { - _log.debug("Performing post commit delivery"); + _log.debug("Beginning post commit delivery"); } try @@ -306,6 +313,7 @@ public class LocalTransactionalContext implements TransactionalContext finally { _postCommitDeliveryList.clear(); + _log.debug("Completed post commit delivery"); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index fdbda006d6..59a18e1b75 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -115,7 +115,7 @@ public class NonTransactionalContext implements TransactionalContext public void requeue(QueueEntry entry) throws AMQException { - entry.requeue(_storeContext); + entry.requeue(); } public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index 2fa017fc64..80c642a734 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -349,14 +349,7 @@ public class Show extends AbstractCommand arrival.add("" + msg.getArrivalTime()); - try - { - ispersitent.add(msg.isPersistent() ? "true" : "false"); - } - catch (AMQException e) - { - ispersitent.add("n/a"); - } + ispersitent.add(msg.isPersistent() ? "true" : "false"); isredelivered.add(msg.isRedelivered() ? "true" : "false"); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 5fcf829c19..250b7470a4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -367,7 +367,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase return false; //To change body of implemented methods use File | Settings | File Templates. } - public void requeue(StoreContext storeContext) throws AMQException + public void requeue() throws AMQException { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index b642433134..eedfbb403e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -167,7 +167,7 @@ public class MockAMQQueue implements AMQQueue return null; //To change body of implemented methods use File | Settings | File Templates. } - public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException + public void requeue(QueueEntry entry) throws AMQException { //To change body of implemented methods use File | Settings | File Templates. } @@ -376,4 +376,14 @@ public class MockAMQQueue implements AMQQueue { return false; } + + public Exchange getAlternateExchange() + { + return null; + } + + public void setAlternateExchange(Exchange exchange) + { + + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 8a0ee55e22..ae720d6153 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -167,7 +167,7 @@ public class MockQueueEntry implements QueueEntry } - public void requeue(StoreContext storeContext) throws AMQException + public void requeue() throws AMQException { |
