summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-12-07 12:24:44 +0000
committerRobert Gemmell <robbie@apache.org>2010-12-07 12:24:44 +0000
commite4ea6e8536ac8dea773ade50cc2872ef6ac4f298 (patch)
tree531822db00ccb8cc5b699aeb9947542dfbc016ff /qpid/java
parent141971d214792d5ec8dd3a8b4aa1a5d1ae7a77e9 (diff)
downloadqpid-python-e4ea6e8536ac8dea773ade50cc2872ef6ac4f298.tar.gz
QPID-2973: broker support for rejecting messages without requeue, and creating+using DLQs
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1042999 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java4
-rw-r--r--qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java74
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java85
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java95
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java35
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java53
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java2
27 files changed, 441 insertions, 73 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
index 78ae09426d..c07fa5af1d 100644
--- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
+++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
@@ -34,7 +34,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.AbstractExchange;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.junit.extensions.util.SizeOf;
@@ -204,7 +204,7 @@ public class DiagnosticExchange extends AbstractExchange
return false;
}
- public void route(IncomingMessage payload) throws AMQException
+ public void route(InboundMessage payload) throws AMQException
{
Long value = new Long(SizeOf.getUsedMemory());
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
index e43bd2ddc0..21e05fd981 100644
--- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
+++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
@@ -28,7 +28,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -102,7 +102,7 @@ public class TestExchange implements Exchange
{
}
- public void route(IncomingMessage message) throws AMQException
+ public void route(InboundMessage message) throws AMQException
{
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 129d40d530..b45c26d22d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -44,12 +45,14 @@ import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.DLQTransactionalContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -1092,4 +1095,75 @@ public class AMQChannel
}
}
}
+
+ public void deadLetter(long deliveryTag) throws AMQException
+ {
+ UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
+ QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+
+ if (rejectedQueueEntry == null)
+ {
+ _log.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
+ return;
+ }
+ else
+ {
+ AMQMessage msg = rejectedQueueEntry.getMessage();
+
+ AMQQueue queue = rejectedQueueEntry.getQueue();
+ Exchange altExchange = queue.getAlternateExchange();
+
+ //TODO:remove below line, its temporary for some noddy testing only
+// altExchange = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry().getExchange(new AMQShortString("dle.test"));
+ if (altExchange == null)
+ {
+ _log.warn("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ rejectedQueueEntry.discard(new StoreContext());
+ return;
+ }
+
+ InboundMessageAdapter adapter = new InboundMessageAdapter(msg);
+ altExchange.route(adapter);
+
+ ArrayList<AMQQueue> destinationQueues = adapter.getEnqueuedList();
+ if (destinationQueues == null || destinationQueues.isEmpty())
+ {
+ _log.warn("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
+ rejectedQueueEntry.discard(new StoreContext());
+ return;
+ }
+
+ //increment the message reference count to include the new queue(s)
+ msg.incrementReference(destinationQueues.size());
+
+ //create a new storeContext to use with a new TransactionContext for the DLQ process
+ StoreContext dlqStoreContext = new StoreContext("Session: " + _session.getClientIdentifier() + "; channel: " + _channelId + "; DLQ deliveryTag: " + deliveryTag);
+ DLQTransactionalContext dlqTxnContext = new DLQTransactionalContext(this, dlqStoreContext);
+
+ //enqueue the message on the new queues in the store if its persistent
+ if (msg.isPersistent())
+ {
+ MessageStore store = getMessageStore();
+
+ for (int i = 0; i < destinationQueues.size(); i++)
+ {
+ store.enqueueMessage(dlqStoreContext, destinationQueues.get(i), msg.getMessageId());
+ }
+ }
+
+ //TODO: ensure the AMQMessage used is NOT marked IMMEDIATE, to prevent it not being enqueued
+
+ //configure the txn context to ack consumption from old queue upon commit
+ unackedMap.acknowledgeMessage(deliveryTag, false, dlqTxnContext);
+
+ //configure the txn context to deliver to the new queues following commit
+ for (int i = 0; i < destinationQueues.size(); i++)
+ {
+ dlqTxnContext.deliver(destinationQueues.get(i), msg);
+ }
+
+ dlqTxnContext.commit();
+
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java
new file mode 100644
index 0000000000..0d27637aec
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.InboundMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+
+public class InboundMessageAdapter implements InboundMessage
+{
+ private AMQMessage _message;
+ private ArrayList<AMQQueue> _queues;
+
+ public InboundMessageAdapter(AMQMessage msg)
+ {
+ _message = msg;
+ }
+
+ public Long getMessageId()
+ {
+ return _message.getMessageId();
+ }
+
+ public boolean isPersistent()
+ {
+ return _message.isPersistent();
+ }
+
+ public boolean isRedelivered()
+ {
+ return _message.isRedelivered();
+ }
+
+ public AMQShortString getRoutingKey() throws AMQException
+ {
+ return _message.getRoutingKey();
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ try
+ {
+ return _message.getContentHeaderBody();
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Error retrieving ContentHeaderBody: " + e, e);
+ }
+ }
+
+ public void enqueue(ArrayList<AMQQueue> queues)
+ {
+ _queues = queues;
+ }
+
+ public ArrayList<AMQQueue> getEnqueuedList()
+ {
+ return _queues;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 8e00539025..a87bef0990 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -36,6 +36,8 @@ import org.apache.commons.configuration.HierarchicalConfiguration;
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -694,4 +696,20 @@ public class ServerConfiguration implements SignalHandler
{
return getConfig().getBoolean("statistics.reporting.reset", false);
}
+
+ /**
+ * String to affix to end of queue name when generating an alternate exchange for DLQ purposes.
+ */
+ public String getDeadLetterExchangeSuffix()
+ {
+ return getConfig().getString("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ }
+
+ /**
+ * String to affix to end of queue name when generating a queue for DLQ purposes.
+ */
+ public String getDeadLetterQueueSuffix()
+ {
+ return getConfig().getString("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 620799a81f..b8affcfb53 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchangeFactory implements ExchangeFactory
{
private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
+ public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
private Map<AMQShortString, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends Exchange>>();
private final VirtualHost _host;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 7f80971a00..a985ad6b6f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -40,7 +40,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -227,7 +227,7 @@ public class DirectExchange extends AbstractExchange
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public void route(InboundMessage payload) throws AMQException
{
final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 06209c5458..8b74c9f6e9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -54,7 +55,7 @@ public interface Exchange
void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
- void route(IncomingMessage message) throws AMQException;
+ void route(InboundMessage message) throws AMQException;
/**
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index f69a741dec..17ea60c524 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -28,7 +28,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -227,7 +227,7 @@ public class FanoutExchange extends AbstractExchange
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public void route(InboundMessage payload) throws AMQException
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 3c3204a59e..8f918e557f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -33,6 +33,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -286,7 +287,7 @@ public class HeadersExchange extends AbstractExchange
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public void route(InboundMessage payload) throws AMQException
{
FieldTable headers = getHeaders(payload.getContentHeaderBody());
if (_logger.isDebugEnabled())
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 56f774c9ae..fd37cf188d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -30,6 +30,7 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortStringTokenizer;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -265,7 +266,7 @@ public class TopicExchange extends AbstractExchange
_filteredQueues.put(queue,newFilters);
}
- public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
+ public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues)
{
if(queues == null)
{
@@ -501,7 +502,7 @@ public class TopicExchange extends AbstractExchange
if(selectorRef == null || (selector = selectorRef.get())==null)
{
- selector = new JMSSelectorFilter<RuntimeException>(selectorString);
+ selector = new JMSSelectorFilter(selectorString);
_selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
}
return selector;
@@ -563,7 +564,7 @@ public class TopicExchange extends AbstractExchange
return normalizedString;
}
- public void route(IncomingMessage payload) throws AMQException
+ public void route(InboundMessage payload) throws AMQException
{
final AMQShortString routingKey = payload.getRoutingKey();
@@ -681,7 +682,7 @@ public class TopicExchange extends AbstractExchange
}
}
- private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey)
+ private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey)
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index fcf3fd4337..09961cb711 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -59,7 +59,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
_logger.debug("Rejecting:" + body.getDeliveryTag() +
": Requeue:" + body.getRequeue() +
- //": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
}
@@ -70,26 +69,23 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
if (message == null)
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
-// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
}
else
{
if (message.isQueueDeleted())
{
- _logger.warn("Message's Queue as already been purged, unable to Reject. " +
- "Dropping message should use Dead Letter Queue");
+ _logger.warn("Message's Queue has already been purged, dropping message");
message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
if(message != null)
{
message.discard(channel.getStoreContext());
}
- //sendtoDeadLetterQueue(msg)
return;
}
if (!message.getMessage().isReferenced())
{
- _logger.warn("Message as already been purged, unable to Reject.");
+ _logger.warn("Message has already been purged, unable to Reject.");
return;
}
@@ -98,15 +94,10 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
_logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
": Requeue:" + body.getRequeue() +
- //": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
}
- // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
- //if (!evt.getMethod().resend)
- {
- message.reject();
- }
+ message.reject();
if (body.getRequeue())
{
@@ -114,11 +105,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
}
else
{
- _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- //sendtoDeadLetterQueue(AMQMessage message)
-// message.queue = channel.getDefaultDeadLetterQueue();
-// channel.requeue(deliveryTag);
+ channel.deadLetter(body.getDeliveryTag());
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index b987dae16d..3e7092cd56 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -41,7 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* A deliverable message.
*/
-public class AMQMessage implements Filterable<AMQException>
+public class AMQMessage implements Filterable
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -273,7 +274,10 @@ public class AMQMessage implements Filterable<AMQException>
return _messageHandle.getContentHeaderBody(getStoreContext());
}
-
+ public AMQShortString getRoutingKey() throws AMQException
+ {
+ return getMessagePublishInfo().getRoutingKey();
+ }
public Long getMessageId()
{
@@ -373,7 +377,7 @@ public class AMQMessage implements Filterable<AMQException>
return (_flags & DELIVERED_TO_CONSUMER) != 0;
}
- public boolean isPersistent() throws AMQException
+ public boolean isPersistent()
{
return _messageHandle.isPersistent();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index f169decbb9..b92e7db02e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -95,7 +95,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
- void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
+ void requeue(QueueEntry entry) throws AMQException;
void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
@@ -248,4 +248,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void configure(QueueConfiguration config);
ManagedObject getManagedObject();
+
+ Exchange getAlternateExchange();
+
+ void setAlternateExchange(Exchange exchange);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index c6dcb1cd01..5f5c6fe1a4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -21,17 +21,23 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.Map;
-import java.util.HashMap;
-
-
public class AMQQueueFactory
{
+ public static final boolean CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED = true;//TODO: take from queue configuration
+ public static final AMQShortString DLQ_ROUTING_KEY = new AMQShortString("dlq");
+ public static final AMQShortString X_QPID_DLQ_ENABLED = new AMQShortString("x-qpid-dlq-enabled");
+ public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
private static final AMQShortString QPID_LAST_VALUE_QUEUE = new AMQShortString ("qpid.last_value_queue");
private static final AMQShortString QPID_LAST_VALUE_QUEUE_KEY = new AMQShortString("qpid.last_value_queue_key");
@@ -147,7 +153,6 @@ public class AMQQueueFactory
}
}
-
AMQQueue q = null;
if(conflationKey != null)
{
@@ -164,7 +169,8 @@ public class AMQQueueFactory
//Register the new queue
virtualHost.getQueueRegistry().registerQueue(q);
- q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+ QueueConfiguration qConfig = virtualHost.getConfiguration().getQueueConfiguration(name.asString());
+ q.configure(qConfig);
if(arguments != null)
{
@@ -177,6 +183,74 @@ public class AMQQueueFactory
}
}
+ boolean dlqArgPresent = (arguments != null && (arguments.containsKey(X_QPID_DLQ_ENABLED)));
+
+ if(dlqArgPresent || CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED)
+ {
+ //verify that the argument isn't explicitly disabling DLQ for this queue.
+ boolean dlqEnabled = true;
+ if(dlqArgPresent)
+ {
+ dlqEnabled = arguments.getBoolean(X_QPID_DLQ_ENABLED);
+ }
+
+ //feature is not to be enabled for temporary queues or when explicitly disabled by argument
+ if(!q.isAutoDelete() && dlqEnabled)
+ {
+ ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+ AMQShortString dlExchangeName = new AMQShortString(name + serverConfig.getDeadLetterExchangeSuffix());
+ AMQShortString dlQueueName = new AMQShortString(name + serverConfig.getDeadLetterQueueSuffix());
+
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ Exchange dlExchange = null;
+ synchronized(exchangeRegistry)
+ {
+ dlExchange = exchangeRegistry.getExchange(dlExchangeName);
+
+ if(dlExchange == null)
+ {
+ dlExchange = exchangeFactory.createExchange(dlExchangeName,
+ ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+
+ exchangeRegistry.registerExchange(dlExchange);
+
+ //enter the dle in the persistent store
+ virtualHost.getMessageStore().createExchange(dlExchange);
+ }
+ }
+
+ AMQQueue dlQueue = null;
+ synchronized(queueRegistry)
+ {
+ dlQueue = queueRegistry.getQueue(dlQueueName);
+
+ if(dlQueue == null)
+ {
+ //set args to disable DLQ'ing from the DLQ itself, preventing loops etc
+ FieldTable args = new FieldTable();
+ args.setBoolean(X_QPID_DLQ_ENABLED, false);
+
+ dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, virtualHost, args);
+
+ //enter the dlq in the persistent store
+ virtualHost.getMessageStore().createQueue(dlQueue, args);
+ }
+ }
+
+ //ensure the queue is bound to the exchange
+ if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
+ {
+ dlQueue.bind(dlExchange, DLQ_ROUTING_KEY, null);
+ }
+
+ q.setAlternateExchange(dlExchange);
+ }
+
+ }
+
return q;
}
@@ -212,6 +286,15 @@ public class AMQQueueFactory
arguments.setString(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
}
+ if (!config.getAutoDelete() && CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED)
+ {
+ if(arguments == null)
+ {
+ arguments = new FieldTable();
+ }
+
+ arguments.setBoolean(X_QPID_DLQ_ENABLED, true);
+ }
AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments);
q.configure(config);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java
new file mode 100644
index 0000000000..00381fb218
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.ArrayList;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+public interface InboundMessage extends Filterable<RuntimeException>
+{
+ AMQShortString getRoutingKey() throws AMQException;
+
+ Long getMessageId();
+
+ void enqueue(ArrayList<AMQQueue> queues);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index d5e0b4d187..519e8bf50c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -35,9 +35,8 @@ import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
-public class IncomingMessage implements Filterable<RuntimeException>
+public class IncomingMessage implements InboundMessage
{
/** Used for debugging purposes. */
@@ -279,7 +278,6 @@ public class IncomingMessage implements Filterable<RuntimeException>
return _contentHeaderBody;
}
-
public boolean isPersistent()
{
return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 5ba8957abb..84f7d00847 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -171,7 +171,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
boolean isRejectedBy(Subscription subscription);
- void requeue(StoreContext storeContext) throws AMQException;
+ void requeue() throws AMQException;
void dequeue(final StoreContext storeContext) throws FailedDequeueException;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 7332e66f99..3f785c7aec 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -260,9 +260,9 @@ public class QueueEntryImpl implements QueueEntry
}
- public void requeue(final StoreContext storeContext) throws AMQException
+ public void requeue() throws AMQException
{
- getQueue().requeue(storeContext, this);
+ getQueue().requeue(this);
if(_stateChangeListeners != null)
{
notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index ba5357ca01..03ff2c48b9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -129,6 +129,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
private final AtomicBoolean _overfull = new AtomicBoolean(false);
+ private Exchange _alternateExchange;
+
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
@@ -554,7 +556,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
+ public void requeue(QueueEntry entry) throws AMQException
{
SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
@@ -1812,4 +1814,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return String.valueOf(getName());
}
+
+ public Exchange getAlternateExchange()
+ {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+ _alternateExchange = exchange;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
new file mode 100644
index 0000000000..78350d9eda
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
+
+/** A transactional context that only supports local transactions
+ * for use in DeadLetterQueue processing */
+public class DLQTransactionalContext extends LocalTransactionalContext
+{
+ private final StoreContext _storeContext;
+
+ public DLQTransactionalContext(final AMQChannel channel, final StoreContext storeContext)
+ {
+ super(channel);
+ _storeContext = storeContext;
+ }
+
+ @Override
+ public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
+ {
+ //TODO: ensure message is not Immediate. Copy the message object if necessary.
+ deliver(queue, message, true);
+ }
+
+ @Override
+ public StoreContext getStoreContext()
+ {
+ return _storeContext;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index c20ce138f3..f487774c65 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -76,7 +76,7 @@ public class LocalTransactionalContext implements TransactionalContext
public void process() throws AMQException
{
- entry.requeue(getStoreContext());
+ entry.requeue();
}
}
@@ -84,11 +84,13 @@ public class LocalTransactionalContext implements TransactionalContext
{
private final AMQQueue _queue;
private final AMQMessage _message;
+ private final boolean _enqueueOnly;
- public PublishAction(final AMQQueue queue, final AMQMessage message)
+ public PublishAction(final AMQQueue queue, final AMQMessage message, boolean enqueueOnly)
{
_queue = queue;
_message = message;
+ _enqueueOnly = enqueueOnly;
}
public void process() throws AMQException
@@ -98,11 +100,14 @@ public class LocalTransactionalContext implements TransactionalContext
try
{
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
- _queue.checkCapacity(_channel);
-
- if(entry.immediateAndNotDelivered())
+ if(!_enqueueOnly)
{
- getReturnMessages().add(new NoConsumersException(_message));
+ _queue.checkCapacity(_channel);
+
+ if(entry.immediateAndNotDelivered())
+ {
+ getReturnMessages().add(new NoConsumersException(_message));
+ }
}
}
finally
@@ -151,24 +156,26 @@ public class LocalTransactionalContext implements TransactionalContext
public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
{
+ deliver(queue, message, false);
+ }
+
+ protected void deliver(final AMQQueue queue, AMQMessage message, boolean enqueueOnly) throws AMQException
+ {
// A publication will result in the enlisting of several
// TxnOps. The first is an op that will store the message.
// Following that (and ordering is important), an op will
// be added for every queue onto which the message is
// enqueued.
- _postCommitDeliveryList.add(new PublishAction(queue, message));
+ _postCommitDeliveryList.add(new PublishAction(queue, message, enqueueOnly));
_messageDelivered = true;
-
}
public void requeue(QueueEntry entry) throws AMQException
{
_postCommitDeliveryList.add(new RequeueAction(entry));
_messageDelivered = true;
-
}
-
private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
{
if (!unacknowledgedMessageMap.contains(deliveryTag))
@@ -255,10 +262,10 @@ public class LocalTransactionalContext implements TransactionalContext
if (_ackOp != null)
{
-
+ //there are unacknowledged messages to commit delivery of to the client
_messageDelivered = true;
_ackOp.consolidate();
- // already enlisted, after commit will reset regardless of outcome
+ // ackOp has already enlisted in the txnBuffer, after commit will reset regardless of outcome
_ackOp = null;
}
@@ -293,7 +300,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
if (_log.isDebugEnabled())
{
- _log.debug("Performing post commit delivery");
+ _log.debug("Beginning post commit delivery");
}
try
@@ -306,6 +313,7 @@ public class LocalTransactionalContext implements TransactionalContext
finally
{
_postCommitDeliveryList.clear();
+ _log.debug("Completed post commit delivery");
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index fdbda006d6..59a18e1b75 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -115,7 +115,7 @@ public class NonTransactionalContext implements TransactionalContext
public void requeue(QueueEntry entry) throws AMQException
{
- entry.requeue(_storeContext);
+ entry.requeue();
}
public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
index 2fa017fc64..80c642a734 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -349,14 +349,7 @@ public class Show extends AbstractCommand
arrival.add("" + msg.getArrivalTime());
- try
- {
- ispersitent.add(msg.isPersistent() ? "true" : "false");
- }
- catch (AMQException e)
- {
- ispersitent.add("n/a");
- }
+ ispersitent.add(msg.isPersistent() ? "true" : "false");
isredelivered.add(msg.isRedelivered() ? "true" : "false");
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 5fcf829c19..250b7470a4 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -367,7 +367,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void requeue(StoreContext storeContext) throws AMQException
+ public void requeue() throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index b642433134..eedfbb403e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -167,7 +167,7 @@ public class MockAMQQueue implements AMQQueue
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
+ public void requeue(QueueEntry entry) throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -376,4 +376,14 @@ public class MockAMQQueue implements AMQQueue
{
return false;
}
+
+ public Exchange getAlternateExchange()
+ {
+ return null;
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 8a0ee55e22..ae720d6153 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -167,7 +167,7 @@ public class MockQueueEntry implements QueueEntry
}
- public void requeue(StoreContext storeContext) throws AMQException
+ public void requeue() throws AMQException
{