summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-12-07 20:38:28 +0000
committerRobert Gemmell <robbie@apache.org>2010-12-07 20:38:28 +0000
commit8b8ddcaa974de4387e3dd84d12a96435ceaf16b3 (patch)
tree13ae547eb3678bdec4ed79bc349b9adb3408c116
parent7af690b42ee87047514804629a01acb052d60c11 (diff)
downloadqpid-python-8b8ddcaa974de4387e3dd84d12a96435ceaf16b3.tar.gz
QPID-2973: ignore the Immediate status of the message when enqueing on the DLQ, force it to be enqueued regardless
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1043194 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java35
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java37
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java5
6 files changed, 60 insertions, 37 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index b45c26d22d..1897d37365 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -1113,8 +1113,6 @@ public class AMQChannel
AMQQueue queue = rejectedQueueEntry.getQueue();
Exchange altExchange = queue.getAlternateExchange();
- //TODO:remove below line, its temporary for some noddy testing only
-// altExchange = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry().getExchange(new AMQShortString("dle.test"));
if (altExchange == null)
{
_log.warn("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
@@ -1150,9 +1148,7 @@ public class AMQChannel
store.enqueueMessage(dlqStoreContext, destinationQueues.get(i), msg.getMessageId());
}
}
-
- //TODO: ensure the AMQMessage used is NOT marked IMMEDIATE, to prevent it not being enqueued
-
+
//configure the txn context to ack consumption from old queue upon commit
unackedMap.acknowledgeMessage(deliveryTag, false, dlqTxnContext);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 79aba8e550..e33bc2e6ea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -20,14 +20,11 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.subscription.Subscription;
@@ -94,6 +91,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
+
+ QueueEntry enqueue(StoreContext storeContext, AMQMessage message, boolean ignoreImmediate) throws AMQException;
void requeue(QueueEntry entry) throws AMQException;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 209e2eafe4..9edb97feec 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -354,9 +354,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
// ------ Enqueue / Dequeue
-
+
public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
{
+ return enqueue(storeContext, message, false);
+ }
+
+ public QueueEntry enqueue(StoreContext storeContext, AMQMessage message, boolean ignoreImmediate) throws AMQException
+ {
incrementQueueCount();
incrementQueueSize(message);
@@ -442,7 +447,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- if (entry.immediateAndNotDelivered())
+ if (entry.immediateAndNotDelivered() && !ignoreImmediate)
{
dequeue(storeContext, entry);
entry.dispose(storeContext);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
index 78350d9eda..91ee259a11 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
@@ -31,6 +31,32 @@ import org.apache.qpid.server.store.StoreContext;
public class DLQTransactionalContext extends LocalTransactionalContext
{
private final StoreContext _storeContext;
+
+ private class DLQPublishAction implements DeliveryAction
+ {
+ private final AMQQueue _queue;
+ private final AMQMessage _message;
+
+ public DLQPublishAction(final AMQQueue queue, final AMQMessage message)
+ {
+ _queue = queue;
+ _message = message;
+ }
+
+ public void process() throws AMQException
+ {
+ _message.incrementReference();
+ try
+ {
+ //enqueue, ignoring whether the message is immediate
+ _queue.enqueue(getStoreContext(),_message, true);
+ }
+ finally
+ {
+ _message.decrementReference(getStoreContext());
+ }
+ }
+ }
public DLQTransactionalContext(final AMQChannel channel, final StoreContext storeContext)
{
@@ -39,15 +65,14 @@ public class DLQTransactionalContext extends LocalTransactionalContext
}
@Override
- public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
+ public StoreContext getStoreContext()
{
- //TODO: ensure message is not Immediate. Copy the message object if necessary.
- deliver(queue, message, true);
+ return _storeContext;
}
@Override
- public StoreContext getStoreContext()
+ protected DeliveryAction createPublishAction(AMQQueue queue, AMQMessage message)
{
- return _storeContext;
+ return new DLQPublishAction(queue, message);
}
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index f487774c65..1e0536d738 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -58,14 +58,12 @@ public class LocalTransactionalContext implements TransactionalContext
private long _txnStartTime;
- private abstract class DeliveryAction
+ protected interface DeliveryAction
{
-
- abstract public void process() throws AMQException;
-
+ void process() throws AMQException;
}
- private class RequeueAction extends DeliveryAction
+ private class RequeueAction implements DeliveryAction
{
public QueueEntry entry;
@@ -80,17 +78,15 @@ public class LocalTransactionalContext implements TransactionalContext
}
}
- private class PublishAction extends DeliveryAction
+ private class PublishAction implements DeliveryAction
{
private final AMQQueue _queue;
private final AMQMessage _message;
- private final boolean _enqueueOnly;
- public PublishAction(final AMQQueue queue, final AMQMessage message, boolean enqueueOnly)
+ public PublishAction(final AMQQueue queue, final AMQMessage message)
{
_queue = queue;
_message = message;
- _enqueueOnly = enqueueOnly;
}
public void process() throws AMQException
@@ -100,14 +96,11 @@ public class LocalTransactionalContext implements TransactionalContext
try
{
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
- if(!_enqueueOnly)
- {
- _queue.checkCapacity(_channel);
+ _queue.checkCapacity(_channel);
- if(entry.immediateAndNotDelivered())
- {
- getReturnMessages().add(new NoConsumersException(_message));
- }
+ if(entry.immediateAndNotDelivered())
+ {
+ getReturnMessages().add(new NoConsumersException(_message));
}
}
finally
@@ -117,6 +110,11 @@ public class LocalTransactionalContext implements TransactionalContext
}
}
+ protected DeliveryAction createPublishAction(AMQQueue queue, AMQMessage message)
+ {
+ return new PublishAction(queue, message);
+ }
+
public LocalTransactionalContext(final AMQChannel channel)
{
_channel = channel;
@@ -156,17 +154,12 @@ public class LocalTransactionalContext implements TransactionalContext
public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
{
- deliver(queue, message, false);
- }
-
- protected void deliver(final AMQQueue queue, AMQMessage message, boolean enqueueOnly) throws AMQException
- {
// A publication will result in the enlisting of several
// TxnOps. The first is an op that will store the message.
// Following that (and ordering is important), an op will
// be added for every queue onto which the message is
// enqueued.
- _postCommitDeliveryList.add(new PublishAction(queue, message, enqueueOnly));
+ _postCommitDeliveryList.add(createPublishAction(queue, message));
_messageDelivered = true;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 42164953a4..3daa049b24 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -167,6 +167,11 @@ public class MockAMQQueue implements AMQQueue
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public QueueEntry enqueue(StoreContext storeContext, AMQMessage message, boolean ignoreImmediate) throws AMQException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void requeue(QueueEntry entry) throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.