summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
commitde248153d311b1e0211dfe3230afcb306f3c0192 (patch)
tree30412df8d5fd1d3ef076fba0903301b25f8a7518
parentf74e4dc27d1655760d0213fd60cc75c272c26f00 (diff)
downloadqpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription BROKER AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver. BasicRejectMethodHandler - initial place holder. TxRollbackHandler - Added comment AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue. AMQQueue - added the queue reference to the Subscription creation ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355 DeliveryManager - adjusted deliver call to allow delivery to the head of the queue. Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed. SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription. SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure. SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue. AMQStateManager - Added BasicRejectMethodHandler TransactionalContext - Added option to deliver the messages to the front of the queue. LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue. NonTransactionalContext - Added option to deliver the messages to the front of the queue. DeliverMessageOperation.java DELELTED AS NOT USED. CLIENT AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover. BasicMessageConsumer - updated the rollback so that it sends reject messages to server. AbstractJMSMessage - whitespace + added extra message properties to the toString() AMQProtocolHandler - whitespace + extra debug output TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on. CLUSTER ClusteredQueue - AMQQueue changes for message deliveryFirst. RemoteSubscriptionImpl - Implementation of Subscription SYSTESTS AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst. AMQQueueMBeanTest - changes for message deliveryFirst. ConcurrencyTest - changes for message deliveryFirst. DeliveryManagerTest - changes for message deliveryFirst. SubscriptionTestHelper - Implementation of Subscription WhiteSpace only UnacknowledgedMessageMapImpl.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java272
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java165
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java143
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java322
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java36
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java232
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java70
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java74
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java44
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java74
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java375
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java22
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java27
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java197
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/MessageQueue.java30
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java12
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java33
32 files changed, 1707 insertions, 609 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 7271bd6e43..7ceb3a7eef 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.Subscription;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -74,28 +75,20 @@ public class AMQChannel
*/
private AtomicLong _deliveryTag = new AtomicLong(0);
- /**
- * A channel has a default queue (the last declared) that is used when no queue name is
- * explictily set
- */
+ /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
private AMQQueue _defaultQueue;
- /**
- * This tag is unique per subscription to a queue. The server returns this in response to a
- * basic.consume request.
- */
+ /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
private int _consumerTag;
/**
- * The current message - which may be partial in the sense that not all frames have been received yet -
- * which has been received by this channel. As the frames are received the message gets updated and once all
- * frames have been received the message can then be routed.
+ * The current message - which may be partial in the sense that not all frames have been received yet - which has
+ * been received by this channel. As the frames are received the message gets updated and once all frames have been
+ * received the message can then be routed.
*/
private AMQMessage _currentMessage;
- /**
- * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
- */
+ /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>();
private final MessageStore _messageStore;
@@ -109,8 +102,8 @@ public class AMQChannel
private TransactionalContext _txnContext;
/**
- * A context used by the message store enabling it to track context for a given channel even across
- * thread boundaries
+ * A context used by the message store enabling it to track context for a given channel even across thread
+ * boundaries
*/
private final StoreContext _storeContext;
@@ -123,7 +116,6 @@ public class AMQChannel
private final AMQProtocolSession _session;
-
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
@@ -138,9 +130,7 @@ public class AMQChannel
_txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
- /**
- * Sets this channel to be part of a local transaction
- */
+ /** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
_txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
@@ -293,17 +283,17 @@ public class AMQChannel
}
/**
- * Subscribe to a queue. We register all subscriptions in the channel so that
- * if the channel is closed we can clean up all subscriptions, even if the
- * client does not explicitly unsubscribe from all queues.
+ * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
+ * up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
*
- * @param tag the tag chosen by the client (if null, server will generate one)
- * @param queue the queue to subscribe to
- * @param session the protocol session of the subscriber
+ * @param tag the tag chosen by the client (if null, server will generate one)
+ * @param queue the queue to subscribe to
+ * @param session the protocol session of the subscriber
* @param noLocal
* @param exclusive
- * @return the consumer tag. This is returned to the subscriber and used in
- * subsequent unsubscribe requests
+ *
+ * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
+ *
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
@@ -335,7 +325,7 @@ public class AMQChannel
}
/**
- * Called from the protocol session to close this channel and clean up.
+ * Called from the protocol session to close this channel and clean up. T
*
* @throws AMQException if there is an error during closure
*/
@@ -344,8 +334,6 @@ public class AMQChannel
_txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
- _txnContext.commit();
-
}
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
@@ -362,8 +350,8 @@ public class AMQChannel
* Add a message to the channel-based list of unacknowledged messages
*
* @param message the message that was delivered
- * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of
- * the delivery tag)
+ * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
+ * delivery tag)
* @param queue the queue from which the message was delivered
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
@@ -376,8 +364,8 @@ public class AMQChannel
}
/**
- * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
- * May result in delivery to this same channel or to other subscribers.
+ * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to
+ * this same channel or to other subscribers.
*
* @throws org.apache.qpid.AMQException if the requeue fails
*/
@@ -386,23 +374,75 @@ public class AMQChannel
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ TransactionalContext nontransacted = null;
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+
for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
if (unacked.queue != null)
{
- _txnContext.deliver(unacked.message, unacked.queue);
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted.deliver(unacked.message, unacked.queue, false);
+ }
+ else
+ {
+ _txnContext.deliver(unacked.message, unacked.queue, false);
+ }
}
}
}
+ public void requeue(long deliveryTag) throws AMQException
+ {
+ UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
- /**
- * Called to resend all outstanding unacknowledged messages to this same channel.
- */
+ if (unacked != null)
+ {
+ TransactionalContext nontransacted = null;
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted.deliver(unacked.message, unacked.queue, false);
+ }
+ else
+ {
+ _txnContext.deliver(unacked.message, unacked.queue, false);
+ }
+ unacked.message.decrementReference(_storeContext);
+ }
+ else
+ {
+ _log.error("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists");
+ }
+
+
+ }
+
+
+ /** Called to resend all outstanding unacknowledged messages to this same channel. */
public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
{
final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+ final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("unacked map contains " + _unacknowledgedMessageMap.size());
+ }
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
@@ -412,21 +452,40 @@ public class AMQChannel
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
msg.setRedelivered(true);
- if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag) && !isSuspended())
+ if (consumerTag != null)
{
- msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ // Consumer exists
+ if (_consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ msgToResend.add(message);
+ }
+ else // consumer has gone
+ {
+ msgToRequeue.add(message);
+ }
}
else
{
// Message has no consumer tag, so was "delivered" to a GET
// or consumer no longer registered
// cannot resend, so re-queue.
- if (message.queue != null && (consumerTag == null || requeue))
+ if (message.queue != null)
+ {
+ if (requeue)
+ {
+ msgToRequeue.add(message);
+ }
+ else
+ {
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
+ }
+ }
+ else
{
- msgToRequeue.add(message);
+ _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
}
}
-
+
// false means continue processing
return false;
}
@@ -436,21 +495,112 @@ public class AMQChannel
}
});
- for(UnacknowledgedMessage message : msgToRequeue)
+ // Process Messages to Resend
+ if (_log.isInfoEnabled())
+ {
+ if (!msgToResend.isEmpty())
+ {
+ _log.info("Preparing (" + msgToResend.size() + ") message to resend to.");
+ }
+ }
+ for (UnacknowledgedMessage message : msgToResend)
+ {
+ AMQMessage msg = message.message;
+
+ // Our Java Client will always suspend the channel when resending!!
+// if (isSuspended())
+// {
+// _log.info("Channel is suspended so requeuing");
+// //move this message to requeue
+// msgToRequeue.add(message);
+// }
+// else
+ {
+ //release to allow it to be delivered
+ msg.release();
+
+ // Without any details from the client about what has been processed we have to mark
+ // all messages in the unacked map as redelivered.
+ msg.setRedelivered(true);
+
+
+ Subscription sub = msg.getDeliveredSubscription();
+
+ if (sub != null)
+ {
+ synchronized (sub.getSendLock())
+ {
+ if (sub.isClosed())
+ {
+ _log.info("Subscription closed during resend so requeuing message");
+ //move this message to requeue
+ msgToRequeue.add(message);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend");
+ }
+ // Will throw an exception if the sub is closed
+ sub.addToResendQueue(msg);
+ _unacknowledgedMessageMap.remove(message.deliveryTag);
+ // Don't decrement as we are bypassing the normal deliver which increments
+ // this is what there is a decrement on the Requeue as deliver will increment.
+ // msg.decrementReference(_storeContext);
+ }
+ }
+ }
+ else
+ {
+ _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
+ //move this message to requeue
+ msgToRequeue.add(message);
+ }
+ }
+ }
+
+ if (_log.isInfoEnabled())
+ {
+ if (!msgToRequeue.isEmpty())
+ {
+ _log.info("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
+ }
+ }
+
+ TransactionalContext nontransacted = null;
+ if (!(_txnContext instanceof NonTransactionalContext))
{
- _txnContext.deliver(message.message, message.queue);
+ nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ // Process Messages to Requeue at the front of the queue
+ for (UnacknowledgedMessage message : msgToRequeue)
+ {
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted.deliver(message.message, message.queue, true);
+ }
+ else
+ {
+ _txnContext.deliver(message.message, message.queue, true);
+ }
+
_unacknowledgedMessageMap.remove(message.deliveryTag);
message.message.decrementReference(_storeContext);
}
}
/**
- * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
- * messages to remove the queue reference and also decrement any message reference counts, without
- * actually removing the item since we may get an ack for a delivery tag that was generated from the
- * deleted queue.
+ * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to
+ * remove the queue reference and also decrement any message reference counts, without actually removing the item
+ * since we may get an ack for a delivery tag that was generated from the deleted queue.
*
* @param queue the queue that has been deleted
+ *
* @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
*/
public void queueDeleted(final AMQQueue queue) throws AMQException
@@ -487,6 +637,7 @@ public class AMQChannel
* @param deliveryTag the last delivery tag
* @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only
* acknowledges the single message specified by the delivery tag
+ *
* @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
@@ -517,10 +668,10 @@ public class AMQChannel
private void checkSuspension()
{
boolean suspend;
-
- suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
- || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
-
+
+ suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
+ || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+
setSuspended(suspend);
}
@@ -570,8 +721,6 @@ public class AMQChannel
public void rollback() throws AMQException
{
_txnContext.rollback();
-
-
}
public String toString()
@@ -617,8 +766,8 @@ public class AMQChannel
}
else
{
- boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
- if(!willSuspend)
+ boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+ if (!willSuspend)
{
final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
@@ -626,7 +775,7 @@ public class AMQChannel
}
- if(willSuspend)
+ if (willSuspend)
{
setSuspended(true);
}
@@ -634,4 +783,9 @@ public class AMQChannel
}
}
+
+ public TransactionalContext getTransactionalContext()
+ {
+ return _txnContext;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index f8b6babd43..fdf087fdea 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -85,7 +85,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
for (UnacknowledgedMessage msg : msgs)
{
remove(msg.deliveryTag);
-
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
new file mode 100644
index 0000000000..ed13092ded
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
+public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody>
+{
+ private static final Logger _logger = Logger.getLogger(BasicRejectMethodHandler.class);
+
+ private static BasicRejectMethodHandler _instance = new BasicRejectMethodHandler();
+
+ public static BasicRejectMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicRejectMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRejectBody> evt) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue);
+
+ int channelId = evt.getChannelId();
+ UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag);
+
+ _logger.info("Need to reject message:" + message);
+// if (evt.getMethod().requeue)
+// {
+// session.getChannel(channelId).requeue(evt.getMethod().deliveryTag);
+// }
+// else
+// {
+// // session.getChannel(channelId).resend(message);
+// }
+
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 8ce5a0ea73..a10f44f906 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -62,6 +62,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
+ // Why, are we not allowed to send messages back to client before the ok method?
channel.resend(session, false);
}
catch (AMQException e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index c60c22c4e4..aa7ea16afc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -36,21 +36,15 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.txn.TransactionalContext;
-/**
- * Combines the information that make up a deliverable message into a more manageable form.
- */
+/** Combines the information that make up a deliverable message into a more manageable form. */
public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- /**
- * Used in clustering
- */
+ /** Used in clustering */
private Set<Object> _tokens;
- /**
- * Only use in clustering - should ideally be removed?
- */
+ /** Only use in clustering - should ideally be removed? */
private AMQProtocolSession _publisher;
private final Long _messageId;
@@ -63,16 +57,14 @@ public class AMQMessage
private TransactionalContext _txnContext;
/**
- * Flag to indicate whether message has been delivered to a
- * consumer. Used in implementing return functionality for
+ * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
* messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
/**
- * We need to keep track of whether the message was 'immediate'
- * as in extreme circumstances, when the checkDelieveredToConsumer
- * is called, the message may already have been received and acknowledged,
- * and the body removed from the store.
+ * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the
+ * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body
+ * removed from the store.
*/
private boolean _immediate;
@@ -80,11 +72,16 @@ public class AMQMessage
private TransientMessageData _transientMessageData = new TransientMessageData();
+ private Subscription _takenBySubcription;
+ public boolean isTaken()
+ {
+ return _taken.get();
+ }
/**
- * Used to iterate through all the body frames associated with this message. Will not
- * keep all the data in memory therefore is memory-efficient.
+ * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
+ * therefore is memory-efficient.
*/
private class BodyFrameIterator implements Iterator<AMQDataBlock>
{
@@ -103,7 +100,7 @@ public class AMQMessage
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
}
catch (AMQException e)
{
@@ -153,7 +150,7 @@ public class AMQMessage
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
}
catch (AMQException e)
{
@@ -166,7 +163,7 @@ public class AMQMessage
{
try
{
- return _messageHandle.getContentChunk(getStoreContext(),_messageId, ++_index);
+ return _messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index);
}
catch (AMQException e)
{
@@ -196,12 +193,14 @@ public class AMQMessage
}
/**
- * Used when recovering, i.e. when the message store is creating references to messages.
- * In that case, the normal enqueue/routingComplete is not done since the recovery process
- * is responsible for routing the messages to queues.
+ * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
+ * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
+ * queues.
+ *
* @param messageId
* @param store
* @param factory
+ *
* @throws AMQException
*/
public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
@@ -213,8 +212,8 @@ public class AMQMessage
}
/**
- * Used in testing only. This allows the passing of the content header immediately
- * on construction.
+ * Used in testing only. This allows the passing of the content header immediately on construction.
+ *
* @param messageId
* @param info
* @param txnContext
@@ -228,14 +227,15 @@ public class AMQMessage
}
/**
- * Used in testing only. This allows the passing of the content header and some body fragments on
- * construction.
+ * Used in testing only. This allows the passing of the content header and some body fragments on construction.
+ *
* @param messageId
* @param info
* @param txnContext
* @param contentHeader
* @param destinationQueues
* @param contentBodies
+ *
* @throws AMQException
*/
public AMQMessage(Long messageId, MessagePublishInfo info,
@@ -280,7 +280,7 @@ public class AMQMessage
}
else
{
- return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId);
+ return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId);
}
}
@@ -338,16 +338,14 @@ public class AMQMessage
return _messageId;
}
- /**
- * Threadsafe. Increment the reference count on the message.
- */
+ /** Threadsafe. Increment the reference count on the message. */
public void incrementReference()
{
_referenceCount.incrementAndGet();
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
}
}
@@ -355,7 +353,7 @@ public class AMQMessage
/**
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
- *
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
@@ -371,7 +369,7 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
}
@@ -394,13 +392,13 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
if (_referenceCount.get() < 0)
{
Thread.dumpStack();
}
}
- if(_referenceCount.get()<0)
+ if (_referenceCount.get() < 0)
{
throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0.");
}
@@ -419,7 +417,8 @@ public class AMQMessage
/**
* Called selectors to determin if the message has already been sent
- * @return _deliveredToConsumer
+ *
+ * @return _deliveredToConsumer
*/
public boolean getDeliveredToConsumer()
{
@@ -427,10 +426,17 @@ public class AMQMessage
}
-
- public boolean taken()
+ public boolean taken(Subscription sub)
{
- return _taken.getAndSet(true);
+ if (_taken.getAndSet(true))
+ {
+ return true;
+ }
+ else
+ {
+ _takenBySubcription = sub;
+ return false;
+ }
}
public void release()
@@ -441,9 +447,9 @@ public class AMQMessage
public boolean checkToken(Object token)
{
- if(_tokens==null)
+ if (_tokens == null)
{
- _tokens = new HashSet<Object>();
+ _tokens = new HashSet<Object>();
}
if (_tokens.contains(token))
@@ -458,11 +464,12 @@ public class AMQMessage
}
/**
- * Registers a queue to which this message is to be delivered. This is
- * called from the exchange when it is routing the message. This will be called before any content bodies have
- * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria.
+ * Registers a queue to which this message is to be delivered. This is called from the exchange when it is routing
+ * the message. This will be called before any content bodies have been received so that the choice of
+ * AMQMessageHandle implementation can be picked based on various criteria.
*
* @param queue the queue
+ *
* @throws org.apache.qpid.AMQException if there is an error enqueuing the message
*/
public void enqueue(AMQQueue queue) throws AMQException
@@ -483,16 +490,15 @@ public class AMQMessage
}
else
{
- return _messageHandle.isPersistent(getStoreContext(),_messageId);
+ return _messageHandle.isPersistent(getStoreContext(), _messageId);
}
}
/**
* Called to enforce the 'immediate' flag.
*
- * @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
*/
public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
{
@@ -500,7 +506,7 @@ public class AMQMessage
if (_immediate && !_deliveredToConsumer)
{
throw new NoConsumersException(this);
- }
+ }
}
public MessagePublishInfo getMessagePublishInfo() throws AMQException
@@ -512,7 +518,7 @@ public class AMQMessage
}
else
{
- pb = _messageHandle.getMessagePublishInfo(getStoreContext(),_messageId);
+ pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
}
return pb;
}
@@ -533,10 +539,7 @@ public class AMQMessage
}
- /**
- * Called when this message is delivered to a consumer. (used to
- * implement the 'immediate' flag functionality).
- */
+ /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
@@ -566,7 +569,7 @@ public class AMQMessage
for (AMQQueue q : destinationQueues)
{
- _txnContext.deliver(this, q);
+ _txnContext.deliver(this, q, true);
}
}
finally
@@ -583,23 +586,22 @@ public class AMQMessage
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
- if(bodyCount == 0)
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ contentHeader);
protocolSession.writeFrame(compositeBlock);
}
else
{
-
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -609,9 +611,9 @@ public class AMQMessage
//
// Now start writing out the other content bodies
//
- for(int i = 1; i < bodyCount; i++)
+ for (int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -627,22 +629,21 @@ public class AMQMessage
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
- if(bodyCount == 0)
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ contentHeader);
protocolSession.writeFrame(compositeBlock);
}
else
{
-
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -652,9 +653,9 @@ public class AMQMessage
//
// Now start writing out the other content bodies
//
- for(int i = 1; i < bodyCount; i++)
+ for (int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -685,10 +686,10 @@ public class AMQMessage
AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
protocolSession.getProtocolMinorVersion(),
- deliveryTag, pb.getExchange(),
- queueSize,
- _messageHandle.isRedelivered(),
- pb.getRoutingKey());
+ deliveryTag, pb.getExchange(),
+ queueSize,
+ _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
getOkFrame.writePayload(buf);
buf.flip();
@@ -699,7 +700,7 @@ public class AMQMessage
{
AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
+ protocolSession.getProtocolMinorVersion(),
getMessagePublishInfo().getExchange(),
replyCode, replyText,
getMessagePublishInfo().getRoutingKey());
@@ -757,12 +758,11 @@ public class AMQMessage
}
catch (AMQException e)
{
- _log.error(e.toString(),e);
+ _log.error(e.toString(), e);
return 0;
}
- }
-
+ }
public void restoreTransientMessageData() throws AMQException
@@ -771,7 +771,7 @@ public class AMQMessage
transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
transientMessageData.setContentHeaderBody(getContentHeaderBody());
transientMessageData.addBodyLength(getContentHeaderBody().getSize());
- _transientMessageData = transientMessageData;
+ _transientMessageData = transientMessageData;
}
@@ -784,6 +784,11 @@ public class AMQMessage
public String toString()
{
return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
- _taken;
+ _taken + " by:" + _takenBySubcription;
+ }
+
+ public Subscription getDeliveredSubscription()
+ {
+ return _takenBySubcription;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index e9ebe6c541..429829e201 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -45,13 +45,11 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
/**
- * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
- * that. It is described fully in RFC 006.
+ * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
+ * fully in RFC 006.
*/
public class AMQQueue implements Managable, Comparable
{
-
-
public static final class ExistingExclusiveSubscription extends AMQException
{
@@ -74,26 +72,19 @@ public class AMQQueue implements Managable, Comparable
private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive();
-
private static final Logger _logger = Logger.getLogger(AMQQueue.class);
private final AMQShortString _name;
- /**
- * null means shared
- */
+ /** null means shared */
private final AMQShortString _owner;
private final boolean _durable;
- /**
- * If true, this queue is deleted when the last subscriber is removed
- */
+ /** If true, this queue is deleted when the last subscriber is removed */
private final boolean _autoDelete;
- /**
- * Holds subscribers to the queue.
- */
+ /** Holds subscribers to the queue. */
private final SubscriptionSet _subscribers;
private final SubscriptionFactory _subscriptionFactory;
@@ -106,20 +97,13 @@ public class AMQQueue implements Managable, Comparable
private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
- /**
- * Manages message delivery.
- */
+ /** Manages message delivery. */
private final DeliveryManager _deliveryMgr;
- /**
- * Used to track bindings to exchanges so that on deletion they can easily
- * be cancelled.
- */
+ /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
private final ExchangeBindings _bindings = new ExchangeBindings(this);
- /**
- * Executor on which asynchronous delivery will be carriedout where required
- */
+ /** Executor on which asynchronous delivery will be carriedout where required */
private final Executor _asyncDelivery;
private final AMQQueueMBean _managedObject;
@@ -127,39 +111,27 @@ public class AMQQueue implements Managable, Comparable
private final VirtualHost _virtualHost;
- /**
- * max allowed size(KB) of a single message
- */
+ /** max allowed size(KB) of a single message */
@Configured(path = "maximumMessageSize", defaultValue = "0")
public long _maximumMessageSize;
- /**
- * max allowed number of messages on a queue.
- */
+ /** max allowed number of messages on a queue. */
@Configured(path = "maximumMessageCount", defaultValue = "0")
public int _maximumMessageCount;
- /**
- * max queue depth for the queue
- */
+ /** max queue depth for the queue */
@Configured(path = "maximumQueueDepth", defaultValue = "0")
public long _maximumQueueDepth;
- /**
- * maximum message age before alerts occur
- */
+ /** maximum message age before alerts occur */
@Configured(path = "maximumMessageAge", defaultValue = "0")
public long _maximumMessageAge;
- /**
- * the minimum interval between sending out consequetive alerts of the same type
- */
+ /** the minimum interval between sending out consequetive alerts of the same type */
@Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
public long _minimumAlertRepeatGap;
- /**
- * total messages received by the queue since startup.
- */
+ /** total messages received by the queue since startup. */
public AtomicLong _totalMessagesReceived = new AtomicLong();
public int compareTo(Object o)
@@ -176,7 +148,6 @@ public class AMQQueue implements Managable, Comparable
}
-
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, VirtualHost virtualHost,
SubscriptionSet subscribers)
@@ -211,7 +182,7 @@ public class AMQQueue implements Managable, Comparable
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
private AMQQueueMBean createMBean() throws AMQException
@@ -251,17 +222,13 @@ public class AMQQueue implements Managable, Comparable
return _autoDelete;
}
- /**
- * @return no of messages(undelivered) on the queue.
- */
+ /** @return no of messages(undelivered) on the queue. */
public int getMessageCount()
{
return _deliveryMgr.getQueueMessageCount();
}
- /**
- * @return List of messages(undelivered) on the queue.
- */
+ /** @return List of messages(undelivered) on the queue. */
public List<AMQMessage> getMessagesOnTheQueue()
{
return _deliveryMgr.getMessages();
@@ -275,6 +242,7 @@ public class AMQQueue implements Managable, Comparable
/**
* @param messageId
+ *
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
*/
public AMQMessage getMessageOnTheQueue(long messageId)
@@ -294,13 +262,12 @@ public class AMQQueue implements Managable, Comparable
}
/**
- * moves messages from this queue to another queue. to do this the approach is following-
- * - setup the queue for moving messages (hold the lock and stop the async delivery)
- * - get all the messages available in the given message id range
- * - setup the other queue for moving messages (hold the lock and stop the async delivery)
- * - send these available messages to the other queue (enqueue in other queue)
- * - Once sending to other Queue is successful, remove messages from this queue
- * - remove locks from both queues and start async delivery
+ * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
+ * moving messages (hold the lock and stop the async delivery) - get all the messages available in the given message
+ * id range - setup the other queue for moving messages (hold the lock and stop the async delivery) - send these
+ * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful,
+ * remove messages from this queue - remove locks from both queues and start async delivery
+ *
* @param fromMessageId
* @param toMessageId
* @param queueName
@@ -316,7 +283,7 @@ public class AMQQueue implements Managable, Comparable
startMovingMessages();
List<AMQMessage> list = getMessagesOnTheQueue();
List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
- int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
+ int maxMessageCountToBeMoved = (int) (toMessageId - fromMessageId + 1);
// Run this loop till you find all the messages or the list has no more messages
for (AMQMessage message : list)
@@ -344,7 +311,7 @@ public class AMQQueue implements Managable, Comparable
{
// remove the lock and start the async delivery
anotherQueue.stopMovingMessages();
- stopMovingMessages();
+ stopMovingMessages();
}
}
@@ -364,10 +331,8 @@ public class AMQQueue implements Managable, Comparable
_deliveryMgr.stopMovingMessages();
_deliveryMgr.processAsync(_asyncDelivery);
}
-
- /**
- * @return MBean object associated with this Queue
- */
+
+ /** @return MBean object associated with this Queue */
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -422,20 +387,16 @@ public class AMQQueue implements Managable, Comparable
public long getOldestMessageArrivalTime()
{
return _deliveryMgr.getOldestMessageArrival();
-
+
}
- /**
- * Removes the AMQMessage from the top of the queue.
- */
+ /** Removes the AMQMessage from the top of the queue. */
public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
_deliveryMgr.removeAMessageFromTop(storeContext);
}
- /**
- * removes all the messages from the queue.
- */
+ /** removes all the messages from the queue. */
public synchronized long clearQueue(StoreContext storeContext) throws AMQException
{
return _deliveryMgr.clearAllMessages(storeContext);
@@ -443,10 +404,10 @@ public class AMQQueue implements Managable, Comparable
public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
- exchange.registerQueue(routingKey, this, arguments);
- if(isDurable() && exchange.isDurable())
+ exchange.registerQueue(routingKey, this, arguments);
+ if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().bindQueue(exchange,routingKey,this,arguments);
+ _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
}
_bindings.addBinding(routingKey, arguments, exchange);
}
@@ -454,9 +415,9 @@ public class AMQQueue implements Managable, Comparable
public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
exchange.deregisterQueue(routingKey, this, arguments);
- if(isDurable() && exchange.isDurable())
+ if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().unbindQueue(exchange,routingKey,this,arguments);
+ _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
}
_bindings.remove(routingKey, arguments, exchange);
}
@@ -466,30 +427,31 @@ public class AMQQueue implements Managable, Comparable
FieldTable filters, boolean noLocal, boolean exclusive)
throws AMQException
{
- if(incrementSubscriberCount() > 1)
+ if (incrementSubscriberCount() > 1)
{
- if(isExclusive())
+ if (isExclusive())
{
decrementSubscriberCount();
throw EXISTING_EXCLUSIVE;
}
- else if(exclusive)
+ else if (exclusive)
{
decrementSubscriberCount();
throw EXISTING_SUBSCRIPTION;
}
}
- else if(exclusive)
+ else if (exclusive)
{
setExclusive(true);
}
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
+ filters, noLocal, this);
- if(subscription.hasFilters())
+ if (subscription.hasFilters())
{
if (_deliveryMgr.hasQueuedMessages())
{
@@ -537,10 +499,10 @@ public class AMQQueue implements Managable, Comparable
" and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
+ removedSubscription.close();
setExclusive(false);
decrementSubscriberCount();
-
// if we are eligible for auto deletion, unregister from the queue registry
if (_autoDelete && _subscribers.isEmpty())
{
@@ -583,13 +545,13 @@ public class AMQQueue implements Managable, Comparable
public void delete() throws AMQException
{
- if(!_deleted.getAndSet(true))
+ if (!_deleted.getAndSet(true))
{
_subscribers.queueDeleted(this);
_bindings.deregister();
_virtualHost.getQueueRegistry().unregisterQueue(_name);
_managedObject.unregister();
- for(Task task : _deleteTaskList)
+ for (Task task : _deleteTaskList)
{
task.doTask(this);
}
@@ -605,7 +567,8 @@ public class AMQQueue implements Managable, Comparable
public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException
{
- _deliveryMgr.deliver(storeContext, getName(), msg);
+ //fixme not sure what this is doing. should we be passing deliverFirst through here?
+ _deliveryMgr.deliver(storeContext, getName(), msg, false);
try
{
msg.checkDeliveredToConsumer();
@@ -620,9 +583,9 @@ public class AMQQueue implements Managable, Comparable
}
- public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
+ public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
- _deliveryMgr.deliver(storeContext, getName(), msg);
+ _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
@@ -731,7 +694,7 @@ public class AMQQueue implements Managable, Comparable
public static interface Task
{
- public void doTask(AMQQueue queue) throws AMQException;
+ public void doTask(AMQQueue queue) throws AMQException;
}
public void addQueueDeleteTask(Task task)
@@ -759,4 +722,8 @@ public class AMQQueue implements Managable, Comparable
_maximumMessageAge = maximumMessageAge;
}
+ public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, AMQMessage msg)
+ {
+ _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 0fc8753a87..208a59516c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -24,9 +24,14 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
+import java.util.Set;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
@@ -38,12 +43,12 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.util.MessageQueue;
+import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-/**
- * Manages delivery of messages on behalf of a queue
- */
+/** Manages delivery of messages on behalf of a queue */
public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
@@ -51,47 +56,36 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
@Configured(path = "advanced.compressBufferOnQueue",
defaultValue = "false")
public boolean compressBufferOnQueue;
- /**
- * Holds any queued messages
- */
- private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
-
- private final ReentrantLock _messageAccessLock = new ReentrantLock();
+ /** Holds any queued messages */
+ private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
- //private int _messageCount;
- /**
- * Ensures that only one asynchronous task is running for this manager at
- * any time.
- */
+ /** Ensures that only one asynchronous task is running for this manager at any time. */
private final AtomicBoolean _processing = new AtomicBoolean();
- /**
- * The subscriptions on the queue to whom messages are delivered
- */
+ /** The subscriptions on the queue to whom messages are delivered */
private final SubscriptionManager _subscriptions;
/**
- * A reference to the queue we are delivering messages for. We need this to be able
- * to pass the code that handles acknowledgements a handle on the queue.
+ * A reference to the queue we are delivering messages for. We need this to be able to pass the code that handles
+ * acknowledgements a handle on the queue.
*/
private final AMQQueue _queue;
/**
- * Flag used while moving messages from this queue to another. For moving messages the async delivery
- * should also stop. This flat should be set to true to stop async delivery and set to false to enable
- * async delivery again.
+ * Flag used while moving messages from this queue to another. For moving messages the async delivery should also
+ * stop. This flat should be set to true to stop async delivery and set to false to enable async delivery again.
*/
private AtomicBoolean _movingMessages = new AtomicBoolean();
-
+
/**
* Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
- * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
- * via the async thread.
- * <p/>
- * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
+ * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be
+ * delivered via the async thread. <p/> Lock is used to control access to hasQueuedMessages() and over the addition
+ * of messages to the queue.
*/
private ReentrantLock _lock = new ReentrantLock();
private AtomicLong _totalMessageSize = new AtomicLong();
-
+ private AtomicInteger _extraMessages = new AtomicInteger();
+ private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -109,7 +103,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
- private boolean addMessageToQueue(AMQMessage msg)
+ private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst)
{
// Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
@@ -122,7 +116,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- _messages.offer(msg);
+ if (deliverFirst)
+ {
+ _messages.pushHead(msg);
+ }
+ else
+ {
+ _messages.offer(msg);
+ }
_totalMessageSize.addAndGet(msg.getSize());
@@ -135,7 +136,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.lock();
try
{
- return !_messages.isEmpty();
+ return !(_messages.isEmpty() && _hasContent.isEmpty());
}
finally
{
@@ -149,18 +150,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
- * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
+ * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine
+ * size. The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
*
* @return int the number of messages in the delivery queue.
*/
private int getMessageCount()
{
- return _messages.size();
+ return _messages.size() + _extraMessages.get();
}
-
public long getTotalMessageSize()
{
return _totalMessageSize.get();
@@ -172,6 +172,38 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
}
+ public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg)
+ {
+ _lock.lock();
+ try
+ {
+ if (hasContent)
+ {
+ _log.debug("Queue has adding subscriber content");
+ _hasContent.add(subscription);
+ _totalMessageSize.addAndGet(msg.getSize());
+ _extraMessages.addAndGet(1);
+ }
+ else
+ {
+ _log.debug("Queue has removing subscriber content");
+ if (msg == null)
+ {
+ _hasContent.remove(subscription);
+ }
+ else
+ {
+ _totalMessageSize.addAndGet(-msg.getSize());
+ _extraMessages.addAndGet(-1);
+ }
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
public List<AMQMessage> getMessages()
{
@@ -195,7 +227,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
AMQMessage message = currentQueue.next();
if (subscription.hasInterest(message))
{
- subscription.enqueueForPreDelivery(message);
+ subscription.enqueueForPreDelivery(message, false);
}
}
}
@@ -203,7 +235,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
{
AMQMessage msg = getNextMessage();
- if(msg == null)
+ if (msg == null)
{
return false;
}
@@ -229,7 +261,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
_queue.dequeue(channel.getStoreContext(), msg);
}
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -252,8 +284,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag,
- * so that the asyn delivery is also stopped.
+ * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, so that
+ * the asyn delivery is also stopped.
*/
public void startMovingMessages()
{
@@ -262,8 +294,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag,
- * so that the async delivery can start again.
+ * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, so that
+ * the async delivery can start again.
*/
public void stopMovingMessages()
{
@@ -276,6 +308,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* Messages will be removed from this queue and all preDeliveryQueues
+ *
* @param messageList
*/
public void removeMovedMessages(List<AMQMessage> messageList)
@@ -308,7 +341,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* Now with implementation of predelivery queues, this method will mark the message on the top as taken.
+ *
* @param storeContext
+ *
* @throws AMQException
*/
public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
@@ -318,11 +353,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (msg != null)
{
// mark this message as taken and get it removed
- msg.taken();
+ msg.taken(null);
_queue.dequeue(storeContext, msg);
getNextMessage();
}
-
+
_lock.unlock();
}
@@ -335,7 +370,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
while (msg != null)
{
//mark this message as taken and get it removed
- msg.taken();
+ msg.taken(null);
_queue.dequeue(storeContext, msg);
msg = getNextMessage();
count++;
@@ -347,20 +382,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public synchronized AMQMessage getNextMessage() throws AMQException
{
- return getNextMessage(_messages);
+ return getNextMessage(_messages, null);
}
-
- private AMQMessage getNextMessage(Queue<AMQMessage> messages)
- {
- return getNextMessage(messages, false);
- }
-
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, boolean browsing)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
{
AMQMessage message = messages.peek();
- while (message != null && (browsing || message.taken()))
+
+ while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub)))
{
//remove the already taken message
messages.poll();
@@ -371,27 +401,76 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return message;
}
- public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue)
+ public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
{
+
+ Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+ ") from queue (" + System.identityHashCode(messageQueue) +
+ ") AMQQueue (" + System.identityHashCode(queue) + ")");
+ }
+
+ if (messageQueue == null)
+ {
+ // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(sub + ": asked to send messages but has none on given queue:" + queue);
+ }
+ return;
+ }
+
AMQMessage message = null;
try
{
- message = getNextMessage(messageQueue, sub.isBrowser());
+ message = getNextMessage(messageQueue, sub);
// message will be null if we have no messages in the messageQueue.
if (message == null)
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+ }
return;
}
if (_log.isDebugEnabled())
{
- _log.debug("Async Delivery Message:" + message + " to :" + sub);
+ _log.debug("Async Delivery Message (" + System.identityHashCode(message) +
+ ") by :" + System.identityHashCode(this) +
+ ") to :" + System.identityHashCode(sub));
}
sub.send(message, _queue);
//remove sent message from our queue.
messageQueue.poll();
+ //If we don't remove the message from _messages
+ // Otherwise the Async send will never end
+
+ if (messageQueue == sub.getResendQueue())
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("All messages sent from resendQueue for " + sub);
+ }
+ if (messageQueue.isEmpty())
+ {
+ subscriberHasPendingResend(false, sub, null);
+ //better to use the above method as this keeps all the tracking in one location.
+// _hasContent.remove(sub);
+ }
+
+ _extraMessages.decrementAndGet();
+ }
+ else if (messageQueue == sub.getPreDeliveryQueue())
+ {
+ _log.info("We could do clean up of the main _message queue here");
+ }
+
_totalMessageSize.addAndGet(-message.getSize());
}
catch (AMQException e)
@@ -403,6 +482,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* enqueues the messages in the list on the queue and all required predelivery queues
+ *
* @param storeContext
* @param movedMessageList
*/
@@ -411,7 +491,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.lock();
for (AMQMessage msg : movedMessageList)
{
- addMessageToQueue(msg);
+ addMessageToQueue(msg, true);
}
// enqueue on the pre delivery queues
@@ -422,7 +502,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Only give the message to those that want them.
if (sub.hasInterest(msg))
{
- sub.enqueueForPreDelivery(msg);
+ sub.enqueueForPreDelivery(msg, true);
}
}
}
@@ -430,8 +510,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * Only one thread should ever execute this method concurrently, but
- * it can do so while other threads invoke deliver().
+ * Only one thread should ever execute this method concurrently, but it can do so while other threads invoke
+ * deliver().
*/
private void processQueue()
{
@@ -444,40 +524,43 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
for (Subscription sub : _subscriptions.getSubscriptions())
{
- if (!sub.isSuspended())
+ synchronized (sub.getSendLock())
{
- sendNextMessage(sub);
-
- hasSubscribers = true;
- }
- }
- }
- }
+ if (!sub.isSuspended())
+ {
+ sendNextMessage(sub, _queue);
- private void sendNextMessage(Subscription sub)
- {
- if (sub.hasFilters())
- {
- sendNextMessage(sub, sub.getPreDeliveryQueue());
- if (sub.isAutoClose())
- {
- if (sub.getPreDeliveryQueue().isEmpty())
- {
- sub.close();
+ hasSubscribers = true;
+ }
}
}
}
- else
- {
- sendNextMessage(sub, _messages);
- }
}
- public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException
+// private void sendNextMessage(Subscription sub)
+// {
+// if (sub.hasFilters())
+// {
+// sendNextMessage(sub, sub.getPreDeliveryQueue());
+// if (sub.isAutoClose())
+// {
+// if (sub.getPreDeliveryQueue().isEmpty())
+// {
+// sub.close();
+// }
+// }
+// }
+// else
+// {
+// sendNextMessage(sub, _messages);
+// }
+// }
+
+ public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "deliver :" + msg);
+ _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg);
}
msg.release();
@@ -491,11 +574,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery");
+ _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
}
if (!msg.getMessagePublishInfo().isImmediate())
{
- addMessageToQueue(msg);
+ addMessageToQueue(msg, deliverFirst);
//release lock now message is on queue.
_lock.unlock();
@@ -504,7 +587,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
_log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
- " subscribers to give the message to.");
+ " subscribers to give the message to:" + currentStatus());
}
for (Subscription sub : _subscriptions.getSubscriptions())
{
@@ -528,7 +611,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
- sub.enqueueForPreDelivery(msg);
+ sub.enqueueForPreDelivery(msg, deliverFirst);
}
}
}
@@ -537,14 +620,47 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
//release lock now
_lock.unlock();
-
- if (_log.isDebugEnabled())
+ synchronized (s.getSendLock())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
- System.identityHashCode(s) + ") :" + s);
+ if (!s.isSuspended())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ System.identityHashCode(s) + ") :" + s);
+ }
+ msg.taken(s);
+ //Deliver the message
+ s.send(msg, _queue);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send");
+ }
+ }
+
+ if (!msg.isTaken())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" +
+ " Subscriber:" + System.identityHashCode(s));
+ }
+
+ deliver(context, name, msg, deliverFirst);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" + System.identityHashCode(msg) +
+ ") has been taken so disregarding deliver request to Subscriber:" +
+ System.identityHashCode(s));
+ }
+ }
}
- //Deliver the message
- s.send(msg, _queue);
}
}
finally
@@ -593,9 +709,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get());
+ _log.debug("Processing Async." + currentStatus());
}
if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
@@ -608,4 +722,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ private String currentStatus()
+ {
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
+ "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " +
+ " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
+ "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get() +
+ " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
+ "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") ";
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index 27abca012b..5b77951dfd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -32,8 +32,8 @@ import org.apache.qpid.server.store.StoreContext;
interface DeliveryManager
{
/**
- * Determines whether there are queued messages. Sets _queueing to false if
- * there are no queued messages. This needs to be atomic.
+ * Determines whether there are queued messages. Sets _queueing to false if there are no queued messages. This needs
+ * to be atomic.
*
* @return true if there are queued messages
*/
@@ -43,34 +43,34 @@ interface DeliveryManager
* This method should not be used to determin if there are messages in the queue.
*
* @return int The number of messages in the queue
+ *
* @use hasQueuedMessages() for all controls relating to having messages on the queue.
*/
int getQueueMessageCount();
/**
- * Requests that the delivery manager start processing the queue asynchronously
- * if there is work that can be done (i.e. there are messages queued up and
- * subscribers that can receive them.
- * <p/>
- * This should be called when subscribers are added, but only after the consume-ok
- * message has been returned as message delivery may start immediately. It should also
- * be called after unsuspending a client.
- * <p/>
+ * Requests that the delivery manager start processing the queue asynchronously if there is work that can be done
+ * (i.e. there are messages queued up and subscribers that can receive them. <p/> This should be called when
+ * subscribers are added, but only after the consume-ok message has been returned as message delivery may start
+ * immediately. It should also be called after unsuspending a client. <p/>
*
* @param executor the executor on which the delivery should take place
*/
void processAsync(Executor executor);
/**
- * Handles message delivery. The delivery manager is always in one of two modes;
- * it is either queueing messages for asynchronous delivery or delivering
- * directly.
+ * Handles message delivery. The delivery manager is always in one of two modes; it is either queueing messages for
+ * asynchronous delivery or delivering directly.
+ *
+ * @param storeContext
+ * @param name the name of the entity on whose behalf we are delivering the message
+ * @param msg the message to deliver
+ * @param deliverFirst
*
- * @param name the name of the entity on whose behalf we are delivering the message
- * @param msg the message to deliver
- * @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued
+ * @throws org.apache.qpid.server.queue.FailedDequeueException
+ * if the message could not be dequeued
*/
- void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException;
+ void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException;
void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
@@ -93,4 +93,6 @@ interface DeliveryManager
long getTotalMessageSize();
long getOldestMessageArrival();
+
+ void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index fa70c6dbac..e9f209839a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -38,13 +38,23 @@ public interface Subscription
Queue<AMQMessage> getPreDeliveryQueue();
- void enqueueForPreDelivery(AMQMessage msg);
+ Queue<AMQMessage> getResendQueue();
+
+ Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages);
+
+ void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst);
boolean isAutoClose();
void close();
+ boolean isClosed();
+
boolean isBrowser();
boolean wouldSuspend(AMQMessage msg);
+
+ void addToResendQueue(AMQMessage msg);
+
+ Object getSendLock();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
index 6902788fc8..917f7c4e97 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
@@ -26,16 +26,16 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.protocol.AMQProtocolSession;
/**
- * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
- * factory primarily assists testing although in future more sophisticated subscribers may need a different
- * subscription implementation.
+ * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
+ * primarily assists testing although in future more sophisticated subscribers may need a different subscription
+ * implementation.
*
* @see org.apache.qpid.server.queue.AMQQueue
*/
public interface SubscriptionFactory
{
Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks,
- FieldTable filters, boolean noLocal) throws AMQException;
+ FieldTable filters, boolean noLocal, AMQQueue queue) throws AMQException;
Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 6bdfeccc0f..ede7731a06 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -21,10 +21,10 @@
package org.apache.qpid.server.queue;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.common.ClientProperties;
@@ -37,6 +37,8 @@ import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.util.MessageQueue;
+import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
/**
* Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
@@ -52,9 +54,11 @@ public class SubscriptionImpl implements Subscription
public final AMQShortString consumerTag;
- private final Object sessionKey;
+ private final Object _sessionKey;
- private Queue<AMQMessage> _messages;
+ private MessageQueue<AMQMessage> _messages;
+
+ private Queue<AMQMessage> _resendQueue;
private final boolean _noLocal;
@@ -63,20 +67,27 @@ public class SubscriptionImpl implements Subscription
private FilterManager _filters;
private final boolean _isBrowser;
private final Boolean _autoClose;
- private boolean _closed = false;
+ private boolean _sentClose = false;
+
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+ private AMQQueue _queue;
+ private final AtomicBoolean _sendLock = new AtomicBoolean(false);
+
+
public static class Factory implements SubscriptionFactory
{
- public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, boolean acks, FieldTable filters,
+ boolean noLocal, AMQQueue queue) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, queue);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, null);
}
}
@@ -84,25 +95,27 @@ public class SubscriptionImpl implements Subscription
AMQShortString consumerTag, boolean acks)
throws AMQException
{
- this(channelId, protocolSession, consumerTag, acks, null, false);
+ this(channelId, protocolSession, consumerTag, acks, null, false, null);
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ AMQShortString consumerTag, boolean acks, FieldTable filters,
+ boolean noLocal, AMQQueue queue)
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
- {
+ {
throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
}
this.channel = channel;
this.protocolSession = protocolSession;
this.consumerTag = consumerTag;
- sessionKey = protocolSession.getKey();
+ _sessionKey = protocolSession.getKey();
_acks = acks;
_noLocal = noLocal;
+ _queue = queue;
_filters = FilterManagerFactory.createManager(filters);
@@ -145,9 +158,7 @@ public class SubscriptionImpl implements Subscription
if (_filters != null)
{
- _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
-
-
+ _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
}
else
{
@@ -169,30 +180,47 @@ public class SubscriptionImpl implements Subscription
return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
}
- /** Equality holds if the session matches and the channel and consumer tag are the same. */
+ /**
+ * Equality holds if the session matches and the channel and consumer tag are the same.
+ *
+ * @param psc The subscriptionImpl to compare
+ *
+ * @return equality
+ */
private boolean equals(SubscriptionImpl psc)
{
- return sessionKey.equals(psc.sessionKey)
+ return _sessionKey.equals(psc._sessionKey)
&& psc.channel == channel
&& psc.consumerTag.equals(consumerTag);
}
public int hashCode()
{
- return sessionKey.hashCode();
+ return _sessionKey.hashCode();
}
public String toString()
{
- return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
+ String subscriber = "[channel=" + channel +
+ ", consumerTag=" + consumerTag +
+ ", session=" + protocolSession.getKey() +
+ ", resendQueue=" + (_resendQueue != null);
+
+ if (_resendQueue != null)
+ {
+ subscriber += ", resendSize=" + _resendQueue.size();
+ }
+
+
+ return subscriber + "]";
}
/**
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
- * @param msg
- * @param queue
+ * @param msg The message to send
+ * @param queue the Queue it has been sent from
*
* @throws AMQException
*/
@@ -278,7 +306,18 @@ public class SubscriptionImpl implements Subscription
public boolean isSuspended()
{
- return channel.isSuspended();
+ if (_logger.isTraceEnabled())
+ {
+ if (channel.isSuspended())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended");
+ }
+ if (_sendLock.get())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing.");
+ }
+ }
+ return channel.isSuspended() || _sendLock.get();
}
/**
@@ -376,11 +415,18 @@ public class SubscriptionImpl implements Subscription
return _messages;
}
- public void enqueueForPreDelivery(AMQMessage msg)
+ public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
{
if (_messages != null)
{
- _messages.offer(msg);
+ if (deliverFirst)
+ {
+ _messages.pushHead(msg);
+ }
+ else
+ {
+ _messages.offer(msg);
+ }
}
}
@@ -391,19 +437,95 @@ public class SubscriptionImpl implements Subscription
public void close()
{
- if (!_closed)
+ synchronized (_sendLock)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting SendLock true");
+ }
+
+ _sendLock.set(true);
+
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this);
+ }
+
+ if (_resendQueue != null && !_resendQueue.isEmpty())
+ {
+ requeue();
+ }
+
+ //remove references in PDQ
+ if (_messages != null)
+ {
+ _messages.clear();
+ }
+
+ if (_autoClose && !_sentClose)
{
_logger.info("Closing autoclose subscription:" + this);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
consumerTag // consumerTag
));
- _closed = true;
+ _sentClose = true;
+ }
+ }
+
+ private void requeue()
+ {
+ if (_queue != null)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Requeuing :" + _resendQueue.size() + " messages");
+ }
+
+ while (!_resendQueue.isEmpty())
+ {
+ AMQMessage resent = _resendQueue.poll();
+
+ resent.release();
+ _queue.subscriberHasPendingResend(false, this, resent);
+
+ try
+ {
+ channel.getTransactionalContext().deliver(resent, _queue, true);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Unable to re-deliver messages", e);
+ }
+ }
+
+ if (!_resendQueue.isEmpty())
+ {
+ _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null.");
+ }
+
+ _queue.subscriberHasPendingResend(false, this, null);
}
+ else
+ {
+ if (!_resendQueue.isEmpty())
+ {
+ _logger.error("Unable to re-deliver messages as queue is null.");
+ }
+ }
+
+ // Clear the messages
+ _resendQueue = null;
+ }
+
+
+ public boolean isClosed()
+ {
+ return _sendLock.get(); // This rather than _close is used to signify the subscriber is now closed.
}
public boolean isBrowser()
@@ -416,5 +538,61 @@ public class SubscriptionImpl implements Subscription
return channel.wouldSuspend(msg);
}
+ public Queue<AMQMessage> getResendQueue()
+ {
+ if (_resendQueue == null)
+ {
+ _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ }
+ return _resendQueue;
+ }
+
+
+ public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ {
+ if (_resendQueue != null && !_resendQueue.isEmpty())
+ {
+ return _resendQueue;
+ }
+
+ if (_filters != null)
+ {
+ if (isAutoClose())
+ {
+ if (_messages.isEmpty())
+ {
+ close();
+ return null;
+ }
+ }
+ return _messages;
+ }
+ else // we want the DM queue
+ {
+ return messages;
+ }
+ }
+
+ public void addToResendQueue(AMQMessage msg)
+ {
+ // add to our resend queue
+ getResendQueue().add(msg);
+
+ // Mark Queue has having content.
+ if (_queue == null)
+ {
+ _logger.error("Queue is null won't be able to resend messages");
+ }
+ else
+ {
+ _queue.subscriberHasPendingResend(true, this, msg);
+ }
+ }
+
+ public Object getSendLock()
+ {
+ return _sendLock;
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 871f063725..26b040aae0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -27,27 +27,20 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-/**
- * Holds a set of subscriptions for a queue and manages the round
- * robin-ing of deliver etc.
- */
+/** Holds a set of subscriptions for a queue and manages the round robin-ing of deliver etc. */
class SubscriptionSet implements WeightedSubscriptionManager
{
private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
- /**
- * List of registered subscribers
- */
+ /** List of registered subscribers */
private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
- /**
- * Used to control the round robin delivery of content
- */
+ /** Used to control the round robin delivery of content */
private int _currentSubscriber;
+ private final Object _subscriptionsChange = new Object();
- /**
- * Accessor for unit tests.
- */
+
+ /** Accessor for unit tests. */
int getCurrentSubscriber()
{
return _currentSubscriber;
@@ -55,21 +48,43 @@ class SubscriptionSet implements WeightedSubscriptionManager
public void addSubscriber(Subscription subscription)
{
- _subscriptions.add(subscription);
+ synchronized (_subscriptionsChange)
+ {
+ _subscriptions.add(subscription);
+ }
}
/**
* Remove the subscription, returning it if it was found
*
* @param subscription
+ *
* @return null if no match was found
*/
public Subscription removeSubscriber(Subscription subscription)
{
- boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here.
- if (isRemoved)
+ // TODO: possibly need O(1) operation here.
+
+ Subscription sub = null;
+ synchronized (_subscriptionsChange)
{
- return subscription;
+ int subIndex = _subscriptions.indexOf(subscription);
+
+ if (subIndex != -1)
+ {
+ //we can't just return the passed in subscription as it is a new object
+ // and doesn't contain the stored state we need.
+ //NOTE while this may be removed now anyone with an iterator will still have it in the list!!
+ sub = _subscriptions.remove(subIndex);
+ }
+ else
+ {
+ _log.error("Unable to remove from index(" + subIndex + ")subscription:" + subscription);
+ }
+ }
+ if (sub != null)
+ {
+ return sub;
}
else
{
@@ -92,14 +107,11 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
/**
- * Return the next unsuspended subscription or null if not found.
- * <p/>
- * Performance note:
- * This method can scan all items twice when looking for a subscription that is not
- * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
- * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of
- * race conditions and when subscriptions are removed between calls to nextSubscriber, the
- * IndexOutOfBoundsException also causes the scan to start at the beginning.
+ * Return the next unsuspended subscription or null if not found. <p/> Performance note: This method can scan all
+ * items twice when looking for a subscription that is not suspended. The worst case occcurs when all subscriptions
+ * are suspended. However, it is does this without synchronisation and subscriptions may be added and removed
+ * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to
+ * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning.
*/
public Subscription nextSubscriber(AMQMessage msg)
{
@@ -156,9 +168,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
return null;
}
- /**
- * Overridden in test classes.
- */
+ /** Overridden in test classes. */
protected void subscriberScanned()
{
}
@@ -199,8 +209,8 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
/**
- * Notification that a queue has been deleted. This is called so that the subscription can inform the
- * channel, which in turn can update its list of unacknowledged messages.
+ * Notification that a queue has been deleted. This is called so that the subscription can inform the channel, which
+ * in turn can update its list of unacknowledged messages.
*
* @param queue
*/
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 29efdd9513..d12f5cd084 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -55,6 +55,7 @@ import org.apache.qpid.framing.QueuePurgeBody;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.handler.BasicAckMethodHandler;
@@ -82,8 +83,9 @@ import org.apache.qpid.server.handler.QueueDeclareHandler;
import org.apache.qpid.server.handler.QueueDeleteHandler;
import org.apache.qpid.server.handler.QueuePurgeHandler;
import org.apache.qpid.server.handler.TxCommitHandler;
-import org.apache.qpid.server.handler.TxRollbackHandler;
+import org.apache.qpid.server.handler.BasicRejectMethodHandler;
import org.apache.qpid.server.handler.TxSelectHandler;
+import org.apache.qpid.server.handler.TxRollbackHandler;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -173,6 +175,7 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance());
frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance());
frame2handlerMap.put(TxRollbackBody.class, TxRollbackHandler.getInstance());
+ frame2handlerMap.put(BasicRejectBody.class, BasicRejectMethodHandler.getInstance());
_state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
deleted file mode 100644
index 4dff514ff4..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.txn;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.StoreContext;
-
-/**
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
- */
-public class DeliverMessageOperation implements TxnOp
-{
- private static final Logger _logger = Logger.getLogger(DeliverMessageOperation.class);
-
- private final AMQMessage _msg;
-
- private final AMQQueue _queue;
-
- public DeliverMessageOperation(AMQMessage msg, AMQQueue queue)
- {
- _msg = msg;
- _queue = queue;
- _msg.incrementReference();
- }
-
- public void prepare(StoreContext context) throws AMQException
- {
- }
-
- public void undoPrepare()
- {
- }
-
- public void commit(StoreContext context) throws AMQException
- {
- //do the memeory part of the record()
- _msg.incrementReference();
- //then process the message
- try
- {
- _queue.process(context, _msg);
- }
- catch (AMQException e)
- {
- //TODO: is there anything else we can do here? I think not...
- _logger.error("Error during commit of a queue delivery: " + e, e);
- }
- }
-
- public void rollback(StoreContext storeContext)
- {
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 5c915b5c84..e5cce672f6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -31,9 +31,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-/**
- * A transactional context that only supports local transactions.
- */
+/** A transactional context that only supports local transactions. */
public class LocalTransactionalContext implements TransactionalContext
{
private static final Logger _log = Logger.getLogger(LocalTransactionalContext.class);
@@ -62,12 +60,14 @@ public class LocalTransactionalContext implements TransactionalContext
{
public AMQMessage message;
public AMQQueue queue;
+ private boolean deliverFirst;
- public DeliveryDetails(AMQMessage message, AMQQueue queue)
+ public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst)
{
this.message = message;
this.queue = queue;
+ this.deliverFirst = deliverFirst;
}
}
@@ -89,9 +89,10 @@ public class LocalTransactionalContext implements TransactionalContext
public void rollback() throws AMQException
{
_txnBuffer.rollback(_storeContext);
+ _postCommitDeliveryList.clear();
}
- public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
{
// A publication will result in the enlisting of several
// TxnOps. The first is an op that will store the message.
@@ -100,7 +101,7 @@ public class LocalTransactionalContext implements TransactionalContext
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
message.incrementReference();
- _postCommitDeliveryList.add(new DeliveryDetails(message, queue));
+ _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
_messageDelivered = true;
/*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
if (_log.isDebugEnabled())
@@ -225,7 +226,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
for (DeliveryDetails dd : _postCommitDeliveryList)
{
- dd.queue.process(_storeContext, dd.message);
+ dd.queue.process(_storeContext, dd.message, dd.deliverFirst);
}
}
finally
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index c7f3a0f0f1..19146da22e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -34,21 +34,15 @@ import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-/**
- * @author Apache Software Foundation
- */
+/** @author Apache Software Foundation */
public class NonTransactionalContext implements TransactionalContext
{
private static final Logger _log = Logger.getLogger(NonTransactionalContext.class);
- /**
- * Channel is useful for logging
- */
+ /** Channel is useful for logging */
private final AMQChannel _channel;
- /**
- * Where to put undeliverable messages
- */
+ /** Where to put undeliverable messages */
private final List<RequiredDeliveryException> _returnMessages;
private Set<Long> _browsedAcks;
@@ -57,9 +51,7 @@ public class NonTransactionalContext implements TransactionalContext
private StoreContext _storeContext;
- /**
- * Whether we are in a transaction
- */
+ /** Whether we are in a transaction */
private boolean _inTran;
public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
@@ -97,12 +89,12 @@ public class NonTransactionalContext implements TransactionalContext
// Does not apply to this context
}
- public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
{
try
{
message.incrementReference();
- queue.process(_storeContext, message);
+ queue.process(_storeContext, message, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
message.checkDeliveredToConsumer();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
index 59d9117fda..88451e2fca 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
@@ -38,7 +38,7 @@ public interface TransactionalContext
void rollback() throws AMQException;
- void deliver(AMQMessage message, AMQQueue queue) throws AMQException;
+ void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException;
void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ad600ddb40..89f596e541 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -255,13 +255,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return _connectionStopped;
}
- void setConnectionStopped(boolean connectionStopped)
+ boolean setConnectionStopped(boolean connectionStopped)
{
+ boolean currently;
synchronized (_lock)
{
+ currently = _connectionStopped;
_connectionStopped = connectionStopped;
_lock.notify();
}
+ return currently;
}
private void dispatchMessage(UnprocessedMessage message)
@@ -543,7 +546,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (!isSuspended)
{
-// suspendChannel(true);
+ suspendChannel(true);
}
_connection.getProtocolHandler().syncWrite(
@@ -556,7 +559,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (!isSuspended)
{
-// suspendChannel(false);
+ suspendChannel(false);
}
}
catch (AMQException e)
@@ -822,10 +825,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean isSuspended = isSuspended();
-// if (!isSuspended)
-// {
-// suspendChannel(true);
-// }
+ if (!isSuspended)
+ {
+ suspendChannel(true);
+ }
for (BasicMessageConsumer consumer : _consumers.values())
{
@@ -841,15 +844,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false) // requeue
, BasicRecoverOkBody.class);
-// if (_dispatcher != null)
-// {
-// _dispatcher.rollback();
-// }
-//
-// if (!isSuspended)
-// {
-// suspendChannel(false);
-// }
+ if (_dispatcher != null)
+ {
+ _dispatcher.rollback();
+ }
+
+ if (!isSuspended)
+ {
+ suspendChannel(false);
+ }
}
catch (AMQException e)
{
@@ -1952,7 +1955,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_dispatcher == null)
{
rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
- }
+ }// if the dispatcher is running we have to do the clean up in the Ok Handler.
}
}
@@ -2171,8 +2174,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
rejectMessagesForConsumerTag(null, requeue);
}
- /** @param consumerTag The consumerTag to prune from queue or all if null
- * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
+ /**
+ * @param consumerTag The consumerTag to prune from queue or all if null
+ * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
*/
private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
@@ -2192,7 +2196,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
messages.remove();
-// rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+ rejectMessage(message.getDeliverBody().deliveryTag, requeue);
_logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 496e377435..e9b914425a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -745,28 +745,32 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag);
}
- for (Object o : _synchronousQueue)
+ Iterator iterator = _synchronousQueue.iterator();
+ while (iterator.hasNext())
{
+ Object o = iterator.next();
+
if (o instanceof AbstractJMSMessage)
{
-// _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true);
+ _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true);
if (_logger.isTraceEnabled())
{
_logger.trace("Rejected message" + o);
+ iterator.remove();
}
}
else
{
_logger.error("Queue contained a :" + o.getClass() +
- " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
}
}
if (_synchronousQueue.size() != 0)
{
- _logger.warn("Queue was not empty after rejecting all messages");
+ _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
}
_synchronousQueue.clear();
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 67d74055c6..36dd4d400c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -87,17 +87,17 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
switch (contentType)
{
- case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(exchange, routingKey, routingKey);
- break;
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
- case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(exchange, routingKey, null);
- break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
- default:
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
- break;
+ default:
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ break;
}
//Destination dest = AMQDestination.createDestination(url);
setJMSDestination(dest);
@@ -203,7 +203,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
if (!(destination instanceof AMQDestination))
{
throw new IllegalArgumentException(
- "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
}
final AMQDestination amqd = (AMQDestination) destination;
@@ -495,8 +495,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public abstract void clearBodyImpl() throws JMSException;
/**
- * Get a String representation of the body of the message. Used in the
- * toString() method which outputs this before message properties.
+ * Get a String representation of the body of the message. Used in the toString() method which outputs this before
+ * message properties.
*/
public abstract String toBodyString() throws JMSException;
@@ -519,7 +519,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
buf.append("\nJMS priority: ").append(getJMSPriority());
buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode());
buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
+ buf.append("\nJMS Redelivered: ").append(_redelivered);
+ buf.append("\nJMS Destination: ").append(getJMSDestination());
+ buf.append("\nJMS Type: ").append(getJMSType());
+ buf.append("\nJMS MessageID: ").append(getJMSMessageID());
buf.append("\nAMQ message number: ").append(_deliveryTag);
+
buf.append("\nProperties:");
if (getJmsHeaders().isEmpty())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 988a12ee78..d0cc52271a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -65,15 +65,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
/**
- * The connection that this protocol handler is associated with. There is a 1-1
- * mapping between connection instances and protocol handler instances.
+ * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
+ * and protocol handler instances.
*/
private AMQConnection _connection;
- /**
- * Our wrapper for a protocol session that provides access to session values
- * in a typesafe manner.
- */
+ /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
private volatile AMQProtocolSession _protocolSession;
private AMQStateManager _stateManager = new AMQStateManager();
@@ -120,8 +117,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// we only add the SSL filter where we have an SSL connection
if (_connection.getSSLConfiguration() != null)
{
- SSLConfiguration sslConfig = _connection.getSSLConfiguration();
- SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+ SSLConfiguration sslConfig = _connection.getSSLConfiguration();
+ SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
sslFilter.setUseClientMode(true);
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
@@ -139,7 +136,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
e.printStackTrace();
}
-
+
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
@@ -154,6 +151,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* sessionClosed() depending on whether we were trying to send data at the time of failure.
*
* @param session
+ *
* @throws Exception
*/
public void sessionClosed(IoSession session) throws Exception
@@ -208,9 +206,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.info("Protocol Session [" + this + "] closed");
}
- /**
- * See {@link FailoverHandler} to see rationale for separate thread.
- */
+ /** See {@link FailoverHandler} to see rationale for separate thread. */
private void startFailoverThread()
{
Thread failoverThread = new Thread(_failoverHandler);
@@ -267,10 +263,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * There are two cases where we have other threads potentially blocking for events to be handled by this
- * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
- * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
- * react appropriately.
+ * There are two cases where we have other threads potentially blocking for events to be handled by this class.
+ * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
+ * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
*
* @param e the exception to propagate
*/
@@ -306,13 +301,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- switch(bodyFrame.getFrameType())
+ switch (bodyFrame.getFrameType())
{
case AMQMethodBody.TYPE:
if (debug)
{
- _logger.debug("Method frame received: " + frame);
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
}
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
@@ -362,10 +357,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_protocolSession.messageContentBodyReceived(frame.getChannel(),
(ContentBody) bodyFrame);
break;
-
+
case HeartbeatBody.TYPE:
- if(debug)
+ if (debug)
{
_logger.debug("Received heartbeat");
}
@@ -413,8 +408,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * Convenience method that writes a frame to the protocol session. Equivalent
- * to calling getProtocolSession().write().
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
*
* @param frame the frame to write
*/
@@ -429,30 +424,28 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * Convenience method that writes a frame to the protocol session and waits for
- * a particular response. Equivalent to calling getProtocolSession().write() then
- * waiting for the response.
+ * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
+ * calling getProtocolSession().write() then waiting for the response.
*
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener)
+ BlockingMethodFrameListener listener)
throws AMQException
{
return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
}
/**
- * Convenience method that writes a frame to the protocol session and waits for
- * a particular response. Equivalent to calling getProtocolSession().write() then
- * waiting for the response.
+ * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
+ * calling getProtocolSession().write() then waiting for the response.
*
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener, long timeout)
+ BlockingMethodFrameListener listener, long timeout)
throws AMQException
{
try
@@ -477,17 +470,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
- /**
- * More convenient method to write a frame and wait for it's response.
- */
+ /** More convenient method to write a frame and wait for it's response. */
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
{
return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
}
- /**
- * More convenient method to write a frame and wait for it's response.
- */
+ /** More convenient method to write a frame and wait for it's response. */
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
{
return writeCommandFrameAndWaitForReply(frame,
@@ -495,9 +484,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * Convenience method to register an AMQSession with the protocol handler. Registering
- * a session with the protocol handler will ensure that messages are delivered to the
- * consumer(s) on that session.
+ * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
+ * handler will ensure that messages are delivered to the consumer(s) on that session.
*
* @param channelId the channel id of the session
* @param session the session instance.
@@ -555,17 +543,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
- /**
- * @return the number of bytes read from this protocol session
- */
+ /** @return the number of bytes read from this protocol session */
public long getReadBytes()
{
return _protocolSession.getIoSession().getReadBytes();
}
- /**
- * @return the number of bytes written to this protocol session
- */
+ /** @return the number of bytes written to this protocol session */
public long getWrittenBytes()
{
return _protocolSession.getIoSession().getWrittenBytes();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
new file mode 100644
index 0000000000..0d75a6b968
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+/**
+ * This class tests a number of commits and roll back scenarios
+ *
+ * Assumptions; - Assumes empty Queue
+ */
+public class CommitRollbackTest extends TestCase
+{
+ protected AMQConnection conn;
+ protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
+ protected String payload = "xyzzy";
+ private Session _session;
+ private MessageProducer _publisher;
+ private Session _pubSession;
+ private MessageConsumer _consumer;
+ Queue _jmsQueue;
+
+ private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class);
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ newConnection();
+ }
+
+ private void newConnection() throws AMQException, URLSyntaxException, JMSException
+ {
+ conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'");
+
+ _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+
+ _jmsQueue = _session.createQueue(queue);
+ _consumer = _session.createConsumer(_jmsQueue);
+
+ _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+
+ _publisher = _pubSession.createProducer(_pubSession.createQueue(queue));
+
+ conn.start();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ conn.close();
+ TransportConnection.killVMBroker(1);
+ }
+
+ /** PUT a text message, disconnect before commit, confirm it is gone. */
+ public void testPutThenDisconnect() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testPutThenDisconnect";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _logger.info("reconnecting without commit");
+ conn.close();
+
+ newConnection();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+
+ //commit to ensure message is removed from queue
+ _session.commit();
+
+ assertNull("test message was put and disconnected before commit, but is still present", result);
+ }
+
+ /** PUT a text message, disconnect before commit, confirm it is gone. */
+ public void testPutThenCloseDisconnect() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testPutThenDisconnect";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _logger.info("closing publisher without commit");
+ _publisher.close();
+
+ _logger.info("reconnecting without commit");
+ conn.close();
+
+ newConnection();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+
+ //commit to ensure message is removed from queue
+ _session.commit();
+
+ assertNull("test message was put and disconnected before commit, but is still present", result);
+ }
+
+ /**
+ * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different
+ * session as producer
+ */
+ public void testPutThenRollback() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testPutThenRollback";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _logger.info("rolling back");
+ _pubSession.rollback();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+
+ assertNull("test message was put and rolled back, but is still present", result);
+ }
+
+ /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */
+ public void testGetThenDisconnect() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testGetThenDisconnect";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+
+ Message msg = _consumer.receive(1000);
+ assertNotNull("retrieved message is null", msg);
+
+ _logger.info("closing connection");
+ conn.close();
+
+ newConnection();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+
+ _session.commit();
+
+ assertNotNull("test message was consumed and disconnected before commit, but is gone", result);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ }
+
+ /**
+ * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the
+ * same connection but different session as producer
+ */
+ public void testGetThenCloseDisconnect() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testGetThenCloseDisconnect";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+
+ Message msg = _consumer.receive(1000);
+ assertNotNull("retrieved message is null", msg);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
+
+ _logger.info("reconnecting without commit");
+ _consumer.close();
+ conn.close();
+
+ newConnection();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+
+ _session.commit();
+
+ assertNotNull("test message was consumed and disconnected before commit, but is gone", result);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ }
+
+ /**
+ * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt
+ * session to the producer
+ */
+ public void testGetThenRollback() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testGetThenDisconnect";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+
+ Message msg = _consumer.receive(1000);
+
+ assertNotNull("retrieved message is null", msg);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
+
+ _logger.info("rolling back");
+
+ _session.rollback();
+
+ _logger.info("receiving result");
+
+ Message result = _consumer.receive(1000);
+
+ _session.commit();
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+ }
+
+ /**
+ * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same
+ * connection but different session as producer
+ */
+ public void testGetThenCloseRollback() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testGetThenCloseRollback";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+
+ Message msg = _consumer.receive(1000);
+
+ assertNotNull("retrieved message is null", msg);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
+
+ _logger.info("Closing consumer");
+ _consumer.close();
+
+ _logger.info("rolling back");
+ _session.rollback();
+
+ _logger.info("receiving result");
+
+ _consumer = _session.createConsumer(_jmsQueue);
+
+ Message result = _consumer.receive(1000);
+
+ _session.commit();
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+ }
+
+
+ /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */
+ public void testSend2ThenRollback() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending two test messages");
+ _publisher.send(_pubSession.createTextMessage("1"));
+ _publisher.send(_pubSession.createTextMessage("2"));
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+ assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
+
+ _logger.info("rolling back");
+ _session.rollback();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("1", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("2", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+
+ assertNull("test message should be null", result);
+ }
+
+ public void testSend2ThenCloseAfter1andTryAgain() throws Exception
+ {
+// assertTrue("session is not transacted", _session.getTransacted());
+// assertTrue("session is not transacted", _pubSession.getTransacted());
+//
+// _logger.info("sending two test messages");
+// _publisher.send(_pubSession.createTextMessage("1"));
+// _publisher.send(_pubSession.createTextMessage("2"));
+// _pubSession.commit();
+//
+// _logger.info("getting test message");
+// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
+//
+// _consumer.close();
+//
+// _consumer = _session.createConsumer(_jmsQueue);
+//
+// _logger.info("receiving result");
+// Message result = _consumer.receive(1000);
+// _logger.error("1:" + result);
+//// assertNotNull("test message was consumed and rolled back, but is gone", result);
+//// assertEquals("1" , ((TextMessage) result).getText());
+//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+//
+// result = _consumer.receive(1000);
+// _logger.error("2" + result);
+//// assertNotNull("test message was consumed and rolled back, but is gone", result);
+//// assertEquals("2", ((TextMessage) result).getText());
+//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
+//
+// result = _consumer.receive(1000);
+// _logger.error("3" + result);
+// assertNull("test message should be null:" + result, result);
+ }
+
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 4296e43f88..94cbb426e5 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -63,12 +63,11 @@ public class TransactedTest extends TestCase
super.setUp();
TransportConnection.createVMBroker(1);
con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
- session = con.createSession(true, 0);
+ session = con.createSession(true, Session.SESSION_TRANSACTED);
queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false);
-
consumer1 = session.createConsumer(queue1);
//Dummy just to create the queue.
MessageConsumer consumer2 = session.createConsumer(queue2);
@@ -81,7 +80,6 @@ public class TransactedTest extends TestCase
prepProducer1 = prepSession.createProducer(queue1);
prepCon.start();
-
//add some messages
prepProducer1.send(prepSession.createTextMessage("A"));
prepProducer1.send(prepSession.createTextMessage("B"));
@@ -127,24 +125,33 @@ public class TransactedTest extends TestCase
public void testRollback() throws Exception
{
+ _logger.info("Sending X Y Z");
producer2.send(session.createTextMessage("X"));
producer2.send(session.createTextMessage("Y"));
producer2.send(session.createTextMessage("Z"));
+ _logger.info("Receiving A B");
expect("A", consumer1.receive(1000));
expect("B", consumer1.receive(1000));
- expect("C", consumer1.receive(1000));
+ //Don't consume 'C' leave it in the prefetch cache to ensure rollback removes it.
//rollback
+ _logger.info("rollback");
session.rollback();
+ _logger.info("Receiving A B C");
//ensure sent messages are not visible and received messages are requeued
expect("A", consumer1.receive(1000));
expect("B", consumer1.receive(1000));
expect("C", consumer1.receive(1000));
+
+ _logger.info("Starting new connection");
testCon.start();
testConsumer1 = testSession.createConsumer(queue1);
+ _logger.info("Testing we have no messages left");
assertTrue(null == testConsumer1.receive(1000));
assertTrue(null == testConsumer2.receive(1000));
+
+ session.commit();
}
public void testResendsMsgsAfterSessionClose() throws Exception
@@ -152,7 +159,7 @@ public class TransactedTest extends TestCase
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("Q3"), false);
+ AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
MessageConsumer consumer = consumerSession.createConsumer(queue3);
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
@@ -225,8 +232,9 @@ public class TransactedTest extends TestCase
private void expect(String text, Message msg) throws JMSException
{
- assertTrue(msg instanceof TextMessage);
- assertEquals(text, ((TextMessage) msg).getText());
+ assertNotNull("Message should not be null", msg);
+ assertTrue("Message should be a text message", msg instanceof TextMessage);
+ assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText());
}
public static junit.framework.Test suite()
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index ecabd9320a..9fa96ece1e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -32,7 +32,6 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
/**
* Represents a shared queue in a cluster. The key difference is that as well as any
@@ -56,10 +55,10 @@ public class ClusteredQueue extends AMQQueue
}
- public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
+ public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
_logger.info(new LogMessage("{0} delivered to clustered queue {1}", msg, this));
- super.process(storeContext, msg);
+ super.process(storeContext, msg, deliverFirst);
}
protected void autodelete() throws AMQException
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index 364aea81c0..a5ace41752 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -117,7 +117,17 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
return null;
}
- public void enqueueForPreDelivery(AMQMessage msg)
+ public Queue<AMQMessage> getResendQueue()
+ {
+ return null;
+ }
+
+ public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ {
+ return messages;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
{
//no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
}
@@ -132,6 +142,11 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
//no-op
}
+ public boolean isClosed()
+ {
+ return false;
+ }
+
public boolean isBrowser()
{
return false;
@@ -142,4 +157,14 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
return _suspended;
}
+ public void addToResendQueue(AMQMessage msg)
+ {
+ //no-op
+ }
+
+ public Object getSendLock()
+ {
+ return new Object();
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
new file mode 100644
index 0000000000..cdf686b4cb
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.util;
+
+import org.apache.log4j.Logger;
+
+import java.util.Queue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQueueAtomicSize<E> implements MessageQueue<E>
+{
+ private static final Logger _logger = Logger.getLogger(ConcurrentLinkedMessageQueueAtomicSize.class);
+
+ protected Queue<E> _messageHead = new ConcurrentLinkedQueueAtomicSize<E>();
+
+ protected AtomicInteger _messageHeadSize = new AtomicInteger(0);
+
+ @Override
+ public int size()
+ {
+ return super.size() + _messageHeadSize.get();
+ }
+
+ @Override
+ public E poll()
+ {
+ if (_messageHead.isEmpty())
+ {
+ return super.poll();
+ }
+ else
+ {
+ _logger.debug("Providing item from message head");
+
+ E e = _messageHead.poll();
+
+ if (e != null)
+ {
+ _messageHeadSize.decrementAndGet();
+ }
+
+ return e;
+ }
+ }
+
+ @Override
+ public boolean remove(Object o)
+ {
+
+ if (_messageHead.isEmpty())
+ {
+ return super.remove(o);
+ }
+ else
+ {
+ if (_messageHead.remove(o))
+ {
+ _messageHeadSize.decrementAndGet();
+ return true;
+ }
+
+ return super.remove(o);
+ }
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c)
+ {
+ if (_messageHead.isEmpty())
+ {
+ return super.removeAll(c);
+ }
+ else
+ {
+ //fixme this is super.removeAll but iterator here doesn't work
+ // we need to be able to correctly decrement _messageHeadSize
+// boolean modified = false;
+// Iterator<?> e = iterator();
+// while (e.hasNext())
+// {
+// if (c.contains(e.next()))
+// {
+// e.remove();
+// modified = true;
+// _size.decrementAndGet();
+// }
+// }
+// return modified;
+
+ throw new RuntimeException("Not implemented");
+ }
+ }
+
+
+ @Override
+ public boolean isEmpty()
+ {
+ return (_messageHead.isEmpty() && super.isEmpty());
+ }
+
+ @Override
+ public void clear()
+ {
+ super.clear();
+ _messageHead.clear();
+ }
+
+ @Override
+ public boolean contains(Object o)
+ {
+ return _messageHead.contains(o) || super.contains(o);
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> o)
+ {
+ return _messageHead.containsAll(o) || super.containsAll(o);
+ }
+
+ @Override
+ public E element()
+ {
+ if (_messageHead.isEmpty())
+ {
+ return super.element();
+ }
+ else
+ {
+ return _messageHead.element();
+ }
+ }
+
+ @Override
+ public E peek()
+ {
+ if (_messageHead.isEmpty())
+ {
+ return super.peek();
+ }
+ else
+ {
+ _logger.debug("Providing item from message head");
+ return _messageHead.peek();
+ }
+
+ }
+
+ @Override
+ public Iterator<E> iterator()
+ {
+ throw new RuntimeException("Not Implemented");
+
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c)
+ {
+ throw new RuntimeException("Not Implemented");
+ }
+
+ @Override
+ public Object[] toArray()
+ {
+ throw new RuntimeException("Not Implemented");
+ }
+
+ public boolean pushHead(E o)
+ {
+ _logger.debug("Adding item to head of queue");
+ if (_messageHead.offer(o))
+ {
+ _messageHeadSize.incrementAndGet();
+ return true;
+ }
+ return false;
+ }
+} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java b/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java
new file mode 100644
index 0000000000..9cf3319374
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.util;
+
+import java.util.Queue;
+
+public interface MessageQueue<E> extends Queue<E>
+{
+
+ boolean pushHead(E o);
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 6beeb92053..ccd23bc0bc 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -272,9 +272,10 @@ public class AbstractHeadersExchangeTestBase extends TestCase
* not invoked. It is unnecessary since for this test we only care to know whether the message was
* sent to the queue; the queue processing logic is not being tested.
* @param msg
+ * @param deliverFirst
* @throws AMQException
*/
- public void process(StoreContext context, AMQMessage msg) throws AMQException
+ public void process(StoreContext context, AMQMessage msg, boolean deliverFirst) throws AMQException
{
messages.add(new HeadersExchangeTest.Message(msg));
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 2d0315d7f5..26332579cb 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -19,7 +19,6 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -151,7 +150,7 @@ public class AMQQueueMBeanTest extends TestCase
AMQMessage msg = message(false);
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
- _queue.process(_storeContext, msg);
+ _queue.process(_storeContext, msg, false);
msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
_queueMBean.viewMessageContent(id);
try
@@ -216,7 +215,7 @@ public class AMQQueueMBeanTest extends TestCase
}
for (int i = 0; i < messageCount; i++)
{
- _queue.process(_storeContext, messages[i]);
+ _queue.process(_storeContext, messages[i], false);
}
for (int i = 0; i < messages.length; i++)
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
index 6f3d42d090..4971db2d28 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
@@ -194,7 +194,7 @@ public class ConcurrencyTest extends MessageTestHelper
AMQMessage msg = nextMessage();
if (msg != null)
{
- _deliveryMgr.deliver(null, new AMQShortString(toString()), msg);
+ _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false);
}
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
index e1be640c8e..dc5a6d3cf6 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
@@ -49,7 +49,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = 0; i < batch; i++)
{
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
}
SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
@@ -59,7 +59,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = batch; i < messages.length; i++)
{
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
}
assertTrue(s1.getMessages().isEmpty());
@@ -97,7 +97,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = 0; i < batch; i++)
{
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
}
assertEquals(batch, s1.getMessages().size());
@@ -111,7 +111,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
s1.setSuspended(true);
for (int i = batch; i < messages.length; i++)
{
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
}
_mgr.processAsync(new OnCurrentThreadExecutor());
@@ -133,7 +133,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
try
{
AMQMessage msg = message(true);
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
}
@@ -155,7 +155,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
_subscriptions.addSubscriber(s);
s.setSuspended(true);
AMQMessage msg = message(true);
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index b3574ecba4..01eb2ba6a2 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -67,12 +67,22 @@ public class SubscriptionTestHelper implements Subscription
return isSuspended;
}
- public boolean wouldSuspend(AMQMessage msg)
+ public boolean wouldSuspend(AMQMessage msg)
{
return isSuspended;
}
-
+ public void addToResendQueue(AMQMessage msg)
+ {
+ //no-op
+ }
+
+ public Object getSendLock()
+ {
+ return new Object();
+ }
+
+
public void queueDeleted(AMQQueue queue)
{
}
@@ -92,7 +102,17 @@ public class SubscriptionTestHelper implements Subscription
return null;
}
- public void enqueueForPreDelivery(AMQMessage msg)
+ public Queue<AMQMessage> getResendQueue()
+ {
+ return null;
+ }
+
+ public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ {
+ return messages;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
{
//no-op
}
@@ -107,9 +127,14 @@ public class SubscriptionTestHelper implements Subscription
//no-op
}
+ public boolean isClosed()
+ {
+ return false;
+ }
+
public boolean isBrowser()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public int hashCode()