From 8b8ddcaa974de4387e3dd84d12a96435ceaf16b3 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Tue, 7 Dec 2010 20:38:28 +0000 Subject: QPID-2973: ignore the Immediate status of the message when enqueing on the DLQ, force it to be enqueued regardless git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1043194 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 6 +--- .../org/apache/qpid/server/queue/AMQQueue.java | 5 ++- .../apache/qpid/server/queue/SimpleAMQQueue.java | 9 ++++-- .../qpid/server/txn/DLQTransactionalContext.java | 35 +++++++++++++++++--- .../qpid/server/txn/LocalTransactionalContext.java | 37 +++++++++------------- .../org/apache/qpid/server/queue/MockAMQQueue.java | 5 +++ 6 files changed, 60 insertions(+), 37 deletions(-) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index b45c26d22d..1897d37365 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -1113,8 +1113,6 @@ public class AMQChannel AMQQueue queue = rejectedQueueEntry.getQueue(); Exchange altExchange = queue.getAlternateExchange(); - //TODO:remove below line, its temporary for some noddy testing only -// altExchange = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry().getExchange(new AMQShortString("dle.test")); if (altExchange == null) { _log.warn("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); @@ -1150,9 +1148,7 @@ public class AMQChannel store.enqueueMessage(dlqStoreContext, destinationQueues.get(i), msg.getMessageId()); } } - - //TODO: ensure the AMQMessage used is NOT marked IMMEDIATE, to prevent it not being enqueued - + //configure the txn context to ack consumption from old queue upon commit unackedMap.acknowledgeMessage(deliveryTag, false, dlqTxnContext); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 79aba8e550..e33bc2e6ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -20,14 +20,11 @@ */ package org.apache.qpid.server.queue; -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.subscription.Subscription; @@ -94,6 +91,8 @@ public interface AMQQueue extends Managable, Comparable QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; + + QueueEntry enqueue(StoreContext storeContext, AMQMessage message, boolean ignoreImmediate) throws AMQException; void requeue(QueueEntry entry) throws AMQException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 209e2eafe4..9edb97feec 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -354,8 +354,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } // ------ Enqueue / Dequeue - + public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException + { + return enqueue(storeContext, message, false); + } + + public QueueEntry enqueue(StoreContext storeContext, AMQMessage message, boolean ignoreImmediate) throws AMQException { incrementQueueCount(); @@ -442,7 +447,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - if (entry.immediateAndNotDelivered()) + if (entry.immediateAndNotDelivered() && !ignoreImmediate) { dequeue(storeContext, entry); entry.dispose(storeContext); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java index 78350d9eda..91ee259a11 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java @@ -31,6 +31,32 @@ import org.apache.qpid.server.store.StoreContext; public class DLQTransactionalContext extends LocalTransactionalContext { private final StoreContext _storeContext; + + private class DLQPublishAction implements DeliveryAction + { + private final AMQQueue _queue; + private final AMQMessage _message; + + public DLQPublishAction(final AMQQueue queue, final AMQMessage message) + { + _queue = queue; + _message = message; + } + + public void process() throws AMQException + { + _message.incrementReference(); + try + { + //enqueue, ignoring whether the message is immediate + _queue.enqueue(getStoreContext(),_message, true); + } + finally + { + _message.decrementReference(getStoreContext()); + } + } + } public DLQTransactionalContext(final AMQChannel channel, final StoreContext storeContext) { @@ -39,15 +65,14 @@ public class DLQTransactionalContext extends LocalTransactionalContext } @Override - public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException + public StoreContext getStoreContext() { - //TODO: ensure message is not Immediate. Copy the message object if necessary. - deliver(queue, message, true); + return _storeContext; } @Override - public StoreContext getStoreContext() + protected DeliveryAction createPublishAction(AMQQueue queue, AMQMessage message) { - return _storeContext; + return new DLQPublishAction(queue, message); } } \ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index f487774c65..1e0536d738 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -58,14 +58,12 @@ public class LocalTransactionalContext implements TransactionalContext private long _txnStartTime; - private abstract class DeliveryAction + protected interface DeliveryAction { - - abstract public void process() throws AMQException; - + void process() throws AMQException; } - private class RequeueAction extends DeliveryAction + private class RequeueAction implements DeliveryAction { public QueueEntry entry; @@ -80,17 +78,15 @@ public class LocalTransactionalContext implements TransactionalContext } } - private class PublishAction extends DeliveryAction + private class PublishAction implements DeliveryAction { private final AMQQueue _queue; private final AMQMessage _message; - private final boolean _enqueueOnly; - public PublishAction(final AMQQueue queue, final AMQMessage message, boolean enqueueOnly) + public PublishAction(final AMQQueue queue, final AMQMessage message) { _queue = queue; _message = message; - _enqueueOnly = enqueueOnly; } public void process() throws AMQException @@ -100,14 +96,11 @@ public class LocalTransactionalContext implements TransactionalContext try { QueueEntry entry = _queue.enqueue(getStoreContext(),_message); - if(!_enqueueOnly) - { - _queue.checkCapacity(_channel); + _queue.checkCapacity(_channel); - if(entry.immediateAndNotDelivered()) - { - getReturnMessages().add(new NoConsumersException(_message)); - } + if(entry.immediateAndNotDelivered()) + { + getReturnMessages().add(new NoConsumersException(_message)); } } finally @@ -117,6 +110,11 @@ public class LocalTransactionalContext implements TransactionalContext } } + protected DeliveryAction createPublishAction(AMQQueue queue, AMQMessage message) + { + return new PublishAction(queue, message); + } + public LocalTransactionalContext(final AMQChannel channel) { _channel = channel; @@ -155,18 +153,13 @@ public class LocalTransactionalContext implements TransactionalContext } public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException - { - deliver(queue, message, false); - } - - protected void deliver(final AMQQueue queue, AMQMessage message, boolean enqueueOnly) throws AMQException { // A publication will result in the enlisting of several // TxnOps. The first is an op that will store the message. // Following that (and ordering is important), an op will // be added for every queue onto which the message is // enqueued. - _postCommitDeliveryList.add(new PublishAction(queue, message, enqueueOnly)); + _postCommitDeliveryList.add(createPublishAction(queue, message)); _messageDelivered = true; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 42164953a4..3daa049b24 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -167,6 +167,11 @@ public class MockAMQQueue implements AMQQueue return null; //To change body of implemented methods use File | Settings | File Templates. } + public QueueEntry enqueue(StoreContext storeContext, AMQMessage message, boolean ignoreImmediate) throws AMQException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public void requeue(QueueEntry entry) throws AMQException { //To change body of implemented methods use File | Settings | File Templates. -- cgit v1.2.1