summaryrefslogtreecommitdiff
path: root/java/broker/src/main
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-07 23:11:53 +0000
committerRobert Greig <rgreig@apache.org>2007-01-07 23:11:53 +0000
commit26d0286ef84e00fd27206b5e23aff5c54309a975 (patch)
treeb3ffd1ef57cbbc31faaf95466f91418421d44032 /java/broker/src/main
parent189816d88cc72f1053a7e7685b18883669c53d57 (diff)
downloadqpid-python-26d0286ef84e00fd27206b5e23aff5c54309a975.tar.gz
QPID-32: new model for holding and processing message in memory to support new persistent stores
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@493872 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java544
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java79
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java41
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java52
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java165
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java115
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java644
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java77
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java102
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java91
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java63
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java133
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java46
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java70
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java39
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java118
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java190
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java100
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java91
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java74
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java148
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java208
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java62
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java13
58 files changed, 2562 insertions, 1182 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index c5b45659cf..999eb9f651 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -22,32 +22,27 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.LocalTransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.TxnBuffer;
-import org.apache.qpid.server.txn.TxnOp;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.Set;
-import java.util.HashSet;
+
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -59,7 +54,7 @@ public class AMQChannel
private final int _channelId;
- private boolean _transactional;
+ //private boolean _transactional;
private long _prefetch_HighWaterMark;
@@ -97,21 +92,24 @@ public class AMQChannel
private final MessageStore _messageStore;
- private final Object _unacknowledgedMessageMapLock = new Object();
-
- private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
-
- private long _lastDeliveryTag;
+ private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private final MessageRouter _exchanges;
- private final TxnBuffer _txnBuffer;
+ private TransactionalContext _txnContext;
+
+ /**
+ * A context used by the message store enabling it to track context for a given channel even across
+ * thread boundaries
+ */
+ private final StoreContext _storeContext = new StoreContext();
+
+ private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
- private TxAck ackOp;
+ private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
- private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
private Set<Long> _browsedAcks = new HashSet<Long>();
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
@@ -122,22 +120,29 @@ public class AMQChannel
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
_exchanges = exchanges;
- _txnBuffer = new TxnBuffer(_messageStore);
+ // by default the session is non-transactional
+ _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
- public int getChannelId()
+ /**
+ * Sets this channel to be part of a local transaction
+ */
+ public void setLocalTransactional()
{
- return _channelId;
+ _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, new TxnBuffer(), _returnMessages);
}
public boolean isTransactional()
{
- return _transactional;
+ // this does not look great but there should only be one "non-transactional"
+ // transactional context, while there could be several transactional ones in
+ // theory
+ return !(_txnContext instanceof NonTransactionalContext);
}
- public void setTransactional(boolean transactional)
+ public int getChannelId()
{
- _transactional = transactional;
+ return _channelId;
}
public long getPrefetchCount()
@@ -173,7 +178,9 @@ public class AMQChannel
public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
{
- _currentMessage = new AMQMessage(_messageStore, publishBody);
+ _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), publishBody,
+ _txnContext);
+ // TODO: used in clustering only I think (RG)
_currentMessage.setPublisher(publisher);
}
@@ -182,92 +189,60 @@ public class AMQChannel
{
if (_currentMessage == null)
{
- throw new AMQException("Received content header without previously receiving a BasicDeliver frame");
+ throw new AMQException("Received content header without previously receiving a BasicPublish frame");
}
else
{
_currentMessage.setContentHeaderBody(contentHeaderBody);
- // check and route if header says body length is zero
+ routeCurrentMessage();
+ _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
+
+ // check and deliver if header says body length is zero
if (contentHeaderBody.bodySize == 0)
{
- routeCurrentMessage();
+ _currentMessage = null;
}
}
}
- public void publishContentBody(ContentBody contentBody)
+ public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession)
throws AMQException
{
if (_currentMessage == null)
{
throw new AMQException("Received content body without previously receiving a JmsPublishBody");
}
- if (_currentMessage.getContentHeaderBody() == null)
+
+ // returns true iff the message was delivered (i.e. if all data was
+ // received
+ try
{
- throw new AMQException("Received content body without previously receiving a content header");
+ if (_currentMessage.addContentBodyFrame(_storeContext, contentBody))
+ {
+ // callback to allow the context to do any post message processing
+ // primary use is to allow message return processing in the non-tx case
+ _txnContext.messageProcessed(protocolSession);
+ _currentMessage = null;
+ }
}
-
- _currentMessage.addContentBodyFrame(contentBody);
- if (_currentMessage.isAllContentReceived())
+ catch (AMQException e)
{
- routeCurrentMessage();
+ // we want to make sure we don't keep a reference to the message in the
+ // event of an error
+ _currentMessage = null;
+ throw e;
}
}
protected void routeCurrentMessage() throws AMQException
{
- if (_transactional)
+ try
{
- //don't create a transaction unless needed
- if (_currentMessage.isPersistent())
- {
- _txnBuffer.containsPersistentChanges();
- }
-
- //A publication will result in the enlisting of several
- //TxnOps. The first is an op that will store the message.
- //Following that (and ordering is important), an op will
- //be added for every queue onto which the message is
- //enqueued. Finally a cleanup op will be added to decrement
- //the reference associated with the routing.
- Store storeOp = new Store(_currentMessage);
- _txnBuffer.enlist(storeOp);
- _currentMessage.setTxnBuffer(_txnBuffer);
- try
- {
- _exchanges.routeContent(_currentMessage);
- _txnBuffer.enlist(new Cleanup(_currentMessage));
- }
- catch (RequiredDeliveryException e)
- {
- //Can only be due to the mandatory flag, as no attempt
- //has yet been made to deliver the message. The
- //message will thus not have been delivered to any
- //queue so we can return the message (without killing
- //the transaction) and for efficiency remove the store
- //operation from the buffer.
- _txnBuffer.cancel(storeOp);
- throw e;
- }
- finally
- {
- _currentMessage = null;
- }
+ _exchanges.routeContent(_currentMessage);
}
- else
+ catch (NoRouteException e)
{
- try
- {
- _exchanges.routeContent(_currentMessage);
- //following check implements the functionality
- //required by the 'immediate' flag:
- _currentMessage.checkDeliveredToConsumer();
- }
- finally
- {
- _currentMessage.decrementReference();
- _currentMessage = null;
- }
+ _returnMessages.add(e);
}
}
@@ -329,13 +304,7 @@ public class AMQChannel
*/
public void close(AMQProtocolSession session) throws AMQException
{
- if (_transactional)
- {
- synchronized(_txnBuffer)
- {
- _txnBuffer.rollback();//releases messages
- }
- }
+ _txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
}
@@ -353,41 +322,32 @@ public class AMQChannel
/**
* Add a message to the channel-based list of unacknowledged messages
*
- * @param message
- * @param deliveryTag
- * @param queue
+ * @param message the message that was delivered
+ * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of
+ * the delivery tag)
+ * @param queue the queue from which the message was delivered
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
{
- synchronized(_unacknowledgedMessageMapLock)
- {
- _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
- _lastDeliveryTag = deliveryTag;
- checkSuspension();
- }
+ _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+ checkSuspension();
}
/**
* Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
* May result in delivery to this same channel or to other subscribers.
+ * @throws org.apache.qpid.AMQException if the requeue fails
*/
public void requeue() throws AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
- Map<Long, UnacknowledgedMessage> currentList;
- synchronized(_unacknowledgedMessageMapLock)
- {
- currentList = _unacknowledgedMessageMap;
- _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
- }
+ Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
- for (UnacknowledgedMessage unacked : currentList.values())
+ for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
if (unacked.queue != null)
{
- unacked.message.setTxnBuffer(null);
-
- unacked.queue.deliver(unacked.message);
+ _txnContext.deliver(unacked.message, unacked.queue);
}
}
}
@@ -395,53 +355,62 @@ public class AMQChannel
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*/
- public void resend(AMQProtocolSession session)
+ public void resend(final AMQProtocolSession session) throws AMQException
{
- //messages go to this channel
- synchronized(_unacknowledgedMessageMapLock)
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- long deliveryTag = entry.getKey();
- String consumerTag = entry.getValue().consumerTag;
- AMQMessage msg = entry.getValue().message;
+ long deliveryTag = message.deliveryTag;
+ String consumerTag = message.consumerTag;
+ AMQMessage msg = message.message;
msg.setRedelivered(true);
- session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+ msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ // false means continue processing
+ return false;
}
- }
+
+ public void visitComplete()
+ {
+ }
+ });
}
/**
* Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
* messages to remove the queue reference and also decrement any message reference counts, without
- * actually removing the item sine we may get an ack for a delivery tag that was generated from the
+ * actually removing the item since we may get an ack for a delivery tag that was generated from the
* deleted queue.
*
- * @param queue
+ * @param queue the queue that has been deleted
+ * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(final AMQQueue queue) throws AMQException
{
- synchronized(_unacknowledgedMessageMapLock)
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet())
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- final UnacknowledgedMessage unackedMsg = unacked.getValue();
- // we can compare the reference safely in this case
- if (unackedMsg.queue == queue)
+ if (message.queue == queue)
{
- unackedMsg.queue = null;
try
{
- unackedMsg.message.decrementReference();
+ message.discard(_storeContext);
+ message.queue = null;
}
catch (AMQException e)
{
- _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " +
+ _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " +
e, e);
}
}
+ return false;
}
- }
+
+ public void visitComplete()
+ {
+ }
+ });
}
/**
@@ -454,150 +423,7 @@ public class AMQChannel
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- if (_transactional)
- {
- //check that the tag exists to give early failure
- if (!multiple || deliveryTag > 0)
- {
- checkAck(deliveryTag);
- }
- //we use a single txn op for all acks and update this op
- //as new acks come in. If this is the first ack in the txn
- //we will need to create and enlist the op.
- if (ackOp == null)
- {
- ackOp = new TxAck(new AckMap());
- _txnBuffer.enlist(ackOp);
- }
- //update the op to include this ack request
- if (multiple && deliveryTag == 0)
- {
- synchronized(_unacknowledgedMessageMapLock)
- {
- //if have signalled to ack all, that refers only
- //to all at this time
- ackOp.update(_lastDeliveryTag, multiple);
- }
- }
- else
- {
- ackOp.update(deliveryTag, multiple);
- }
- }
- else
- {
- handleAcknowledgement(deliveryTag, multiple);
- }
- }
-
- private void checkAck(long deliveryTag) throws AMQException
- {
- synchronized(_unacknowledgedMessageMapLock)
- {
- if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
- {
- throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
- }
- }
- }
-
- private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
- " and multiple " + multiple);
- }
- if (multiple)
- {
- LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
- synchronized(_unacknowledgedMessageMapLock)
- {
- if (deliveryTag == 0)
- {
- //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages.
- _log.trace("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size());
- acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values());
- _unacknowledgedMessageMap.clear();
- }
- else
- {
- if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
- {
- throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
- }
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator();
-
- while (i.hasNext())
- {
-
- Map.Entry<Long, UnacknowledgedMessage> unacked = i.next();
-
- if (unacked.getKey() > deliveryTag)
- {
- //This should not occur now.
- throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString());
- }
-
- i.remove();
-
- acked.add(unacked.getValue());
- if (unacked.getKey() == deliveryTag)
- {
- break;
- }
- }
- }
- }// synchronized
-
- if (_log.isTraceEnabled())
- {
- _log.trace("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
- acked.size() + " items.");
- }
-
- for (UnacknowledgedMessage msg : acked)
- {
- if (!_browsedAcks.contains(deliveryTag))
- {
- msg.discard();
- }
- else
- {
- _browsedAcks.remove(deliveryTag);
- }
- }
-
- }
- else
- {
- UnacknowledgedMessage msg;
- synchronized(_unacknowledgedMessageMapLock)
- {
- msg = _unacknowledgedMessageMap.remove(deliveryTag);
- }
-
- if (msg == null)
- {
- _log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
- throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
- }
-
- if (!_browsedAcks.contains(deliveryTag))
- {
- msg.discard();
- }
- else
- {
- _browsedAcks.remove(deliveryTag);
- }
-
- if (_log.isTraceEnabled())
- {
- _log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
- }
- }
-
+ _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
checkSuspension();
}
@@ -606,19 +432,22 @@ public class AMQChannel
*
* @return the map of unacknowledged messages
*/
- public Map<Long, UnacknowledgedMessage> getUnacknowledgedMessageMap()
+ public UnacknowledgedMessageMap getUnacknowledgedMessageMap()
{
return _unacknowledgedMessageMap;
}
+ public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue)
+ {
+ _browsedAcks.add(deliveryTag);
+ addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
private void checkSuspension()
{
boolean suspend;
- //noinspection SynchronizeOnNonFinalField
- synchronized(_unacknowledgedMessageMapLock)
- {
- suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
- }
+ suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
+
setSuspended(suspend);
}
@@ -628,11 +457,8 @@ public class AMQChannel
if (isSuspended && !suspended)
{
- synchronized(_unacknowledgedMessageMapLock)
- {
- // Continue being suspended if we are above the _prefetch_LowWaterMark
- suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
- }
+ // Continue being suspended if we are above the _prefetch_LowWaterMark
+ suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
}
boolean wasSuspended = _suspended.getAndSet(suspended);
@@ -661,33 +487,18 @@ public class AMQChannel
public void commit() throws AMQException
{
- if (ackOp != null)
- {
- ackOp.consolidate();
- if (ackOp.checkPersistent())
- {
- _txnBuffer.containsPersistentChanges();
- }
- ackOp = null;//already enlisted, after commit will reset regardless of outcome
- }
-
- _txnBuffer.commit();
- //TODO: may need to return 'immediate' messages at this point
+ _txnContext.commit();
}
public void rollback() throws AMQException
{
- //need to protect rollback and close from each other...
- synchronized(_txnBuffer)
- {
- _txnBuffer.rollback();
- }
+ _txnContext.rollback();
}
public String toString()
{
StringBuilder sb = new StringBuilder(30);
- sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional);
+ sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional());
sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
sb.append("/").append(_prefetch_HighWaterMark);
return sb.toString();
@@ -703,121 +514,18 @@ public class AMQChannel
return _defaultQueue;
}
- public void processReturns(AMQProtocolSession session)
+ public StoreContext getStoreContext()
{
- for (AMQDataBlock block : _returns)
- {
- session.writeFrame(block);
- }
- _returns.clear();
- }
-
- public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue)
- {
- _browsedAcks.add(deliveryTag);
- addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ return _storeContext;
}
- //we use this wrapper to ensure we are always using the correct
- //map instance (its not final unfortunately)
- private class AckMap implements UnacknowledgedMessageMap
+ public void processReturns(AMQProtocolSession session) throws AMQException
{
- public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
- {
- impl().collect(deliveryTag, multiple, msgs);
- }
-
- public void remove(List<UnacknowledgedMessage> msgs)
- {
- impl().remove(msgs);
- }
-
- private UnacknowledgedMessageMap impl()
- {
- return new UnacknowledgedMessageMapImpl(_unacknowledgedMessageMapLock, _unacknowledgedMessageMap);
- }
- }
-
- private class Store implements TxnOp
- {
- //just use this to do a store of the message during the
- //prepare phase. Any enqueueing etc is done by TxnOps enlisted
- //by the queues themselves.
- private final AMQMessage _msg;
-
- Store(AMQMessage msg)
- {
- _msg = msg;
- }
-
- public void prepare() throws AMQException
- {
- _msg.storeMessage();
- //the routers reference can now be released
- _msg.decrementReference();
- }
-
- public void undoPrepare()
- {
- }
-
- public void commit()
- {
- }
-
- public void rollback()
+ for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
+ AMQMessage message = bouncedMessage.getAMQMessage();
+ message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), bouncedMessage.getMessage());
}
+ _returnMessages.clear();
}
-
- private class Cleanup implements TxnOp
- {
- private final AMQMessage _msg;
-
- Cleanup(AMQMessage msg)
- {
- _msg = msg;
- }
-
- public void prepare() throws AMQException
- {
- }
-
- public void undoPrepare()
- {
- //don't need to do anything here, if the store's txn failed
- //when processing prepare then the message was not stored
- //or enqueued on any queues and can be discarded
- }
-
- public void commit()
- {
- //The routers reference can now be released. This is done
- //here to ensure that it happens after the queues that
- //enqueue it have incremented their counts (which as a
- //memory only operation is done in the commit phase).
- try
- {
- _msg.decrementReference();
- }
- catch (AMQException e)
- {
- _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
- }
- try
- {
- _msg.checkDeliveredToConsumer();
- }
- catch (NoConsumersException e)
- {
- //TODO: store this for delivery after the commit-ok
- _returns.add(e.getReturnMessage(_channelId));
- }
- }
-
- public void rollback()
- {
- }
- }
-
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 5e463646f9..b85e3603b7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,17 +20,9 @@
*/
package org.apache.qpid.server;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
-import java.util.List;
-
/**
* Signals that a required delivery could not be made. This could be bacuse of
* the immediate flag being set and the queue having no consumers, or the mandatory
@@ -38,79 +30,24 @@ import java.util.List;
*/
public abstract class RequiredDeliveryException extends AMQException
{
- private final String _message;
- private final BasicPublishBody _publishBody;
- private final ContentHeaderBody _contentHeaderBody;
- private final List<ContentBody> _contentBodies;
+ private final AMQMessage _amqMessage;
public RequiredDeliveryException(String message, AMQMessage payload)
{
super(message);
- _message = message;
- _publishBody = payload.getPublishBody();
- _contentHeaderBody = payload.getContentHeaderBody();
- _contentBodies = payload.getContentBodies();
+ _amqMessage = payload;
+ payload.incrementReference();
}
- public RequiredDeliveryException(String message,
- BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
+ public AMQMessage getAMQMessage()
{
- super(message);
- _message = message;
- _publishBody = publishBody;
- _contentHeaderBody = contentHeaderBody;
- _contentBodies = contentBodies;
- }
-
- public BasicPublishBody getPublishBody()
- {
- return _publishBody;
- }
-
- public ContentHeaderBody getContentHeaderBody()
- {
- return _contentHeaderBody;
- }
-
- public List<ContentBody> getContentBodies()
- {
- return _contentBodies;
- }
-
- public CompositeAMQDataBlock getReturnMessage(int channel)
- {
- // AMQP version change: All generated *Body classes are now version-aware.
- // Shortcut: hardwire version to 0-8 (major=8, minor=0) for now.
- // TODO: Connect the version to that returned by the ProtocolInitiation
- // for this session.
- BasicReturnBody returnBody = new BasicReturnBody((byte)8, (byte)0);
- returnBody.exchange = _publishBody.exchange;
- returnBody.replyCode = getReplyCode();
- returnBody.replyText = _message;
- returnBody.routingKey = _publishBody.routingKey;
-
- AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
-
- AMQFrame returnFrame = new AMQFrame();
- returnFrame.bodyFrame = returnBody;
- returnFrame.channel = channel;
-
- allFrames[0] = returnFrame;
- allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
- for (int i = 2; i < allFrames.length; i++)
- {
- allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2));
- }
-
- return new CompositeAMQDataBlock(allFrames);
+ return _amqMessage;
}
public int getErrorCode()
{
return getReplyCode();
- }
+ }
public abstract int getReplyCode();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index a204e176aa..c0a631080e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,13 +22,14 @@ package org.apache.qpid.server.ack;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.store.StoreContext;
import java.util.LinkedList;
import java.util.List;
/**
* A TxnOp implementation for handling accumulated acks
- */
+ */
public class TxAck implements TxnOp
{
private final UnacknowledgedMessageMap _map;
@@ -44,14 +45,14 @@ public class TxAck implements TxnOp
public void update(long deliveryTag, boolean multiple)
{
- if(!multiple)
+ if (!multiple)
{
//have acked a single message that is not part of
//the previously acked region so record
//individually
_individual.add(deliveryTag);//_multiple && !multiple
}
- else if(deliveryTag > _deliveryTag)
+ else if (deliveryTag > _deliveryTag)
{
//have simply moved the last acked message on a
//bit
@@ -63,7 +64,7 @@ public class TxAck implements TxnOp
public void consolidate()
{
//lookup all the unacked messages that have been acked in this transaction
- if(_multiple)
+ if (_multiple)
{
//get all the unacked messages for the accumulated
//multiple acks
@@ -71,22 +72,22 @@ public class TxAck implements TxnOp
}
//get any unacked messages for individual acks outside the
//range covered by multiple acks
- for(long tag : _individual)
+ for (long tag : _individual)
{
if(_deliveryTag < tag)
{
- _map.collect(tag, false, _unacked);
+ _map.collect(tag, false, _unacked);
}
}
}
public boolean checkPersistent() throws AMQException
- {
+ {
//if any of the messages in unacked are persistent the txn
//buffer must be marked as persistent:
- for(UnacknowledgedMessage msg : _unacked)
+ for (UnacknowledgedMessage msg : _unacked)
{
- if(msg.message.isPersistent())
+ if (msg.message.isPersistent())
{
return true;
}
@@ -94,34 +95,34 @@ public class TxAck implements TxnOp
return false;
}
- public void prepare() throws AMQException
+ public void prepare(StoreContext storeContext) throws AMQException
{
//make persistent changes, i.e. dequeue and decrementReference
- for(UnacknowledgedMessage msg : _unacked)
+ for (UnacknowledgedMessage msg : _unacked)
{
- msg.discard();
+ msg.discard(storeContext);
}
}
-
+
public void undoPrepare()
{
//decrementReference is annoyingly untransactional (due to
//in memory counter) so if we failed in prepare for full
//txn, this op will have to compensate by fixing the count
- //in memory (persistent changes will be rolled back by store)
- for(UnacknowledgedMessage msg : _unacked)
+ //in memory (persistent changes will be rolled back by store)
+ for (UnacknowledgedMessage msg : _unacked)
{
msg.message.incrementReference();
- }
+ }
}
- public void commit()
+ public void commit(StoreContext storeContext)
{
//remove the unacked messages from the channels map
_map.remove(_unacked);
}
- public void rollback()
+ public void rollback(StoreContext storeContext)
{
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index 0eff5a3cca..26f41e19af 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@ package org.apache.qpid.server.ack;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
public class UnacknowledgedMessage
{
@@ -30,7 +31,7 @@ public class UnacknowledgedMessage
public final String consumerTag;
public final long deliveryTag;
public AMQQueue queue;
-
+
public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag, long deliveryTag)
{
this.queue = queue;
@@ -39,13 +40,13 @@ public class UnacknowledgedMessage
this.deliveryTag = deliveryTag;
}
- public void discard() throws AMQException
+ public void discard(StoreContext storeContext) throws AMQException
{
if (queue != null)
{
- message.dequeue(queue);
+ message.dequeue(storeContext, queue);
}
- message.decrementReference();
+ message.decrementReference(storeContext);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
index b0bbe224e3..b7a75d5b71 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,11 +20,55 @@
*/
package org.apache.qpid.server.ack;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.txn.TransactionalContext;
+
+import java.util.Collection;
import java.util.List;
+import java.util.Set;
public interface UnacknowledgedMessageMap
{
- public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
- public void remove(List<UnacknowledgedMessage> msgs);
+ public interface Visitor
+ {
+ /**
+ * @param message the message being iterated over
+ * @return true to stop iteration, false to continue
+ * @throws AMQException
+ */
+ boolean callback(UnacknowledgedMessage message) throws AMQException;
+
+ void visitComplete();
+ }
+
+ void visit(Visitor visitor) throws AMQException;
+
+ void add(long deliveryTag, UnacknowledgedMessage message);
+
+ void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
+
+ boolean contains(long deliveryTag) throws AMQException;
+
+ void remove(List<UnacknowledgedMessage> msgs);
+
+ UnacknowledgedMessage remove(long deliveryTag);
+
+ void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException;
+
+ Collection<UnacknowledgedMessage> cancelAllMessages();
+
+ void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException;
+
+ int size();
+
+ void clear();
+
+ UnacknowledgedMessage get(long deliveryTag);
+
+ /**
+ * Get the set of delivery tags that are outstanding.
+ * @return a set of delivery tags
+ */
+ Set<Long> getDeliveryTags();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index eda3233e56..1f4333549a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,19 +20,34 @@
*/
package org.apache.qpid.server.ack;
-import java.util.List;
-import java.util.Map;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.AMQException;
+
+import java.util.*;
public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
- private final Object _lock;
+ private final Object _lock = new Object();
+
private Map<Long, UnacknowledgedMessage> _map;
- public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map)
+ private long _lastDeliveryTag;
+
+ private final int _prefetchLimit;
+
+ public UnacknowledgedMessageMapImpl(int prefetchLimit)
+ {
+ _prefetchLimit = prefetchLimit;
+ _map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit);
+ }
+
+ /*public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map)
{
_lock = lock;
_map = map;
- }
+ } */
public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
{
@@ -47,28 +62,151 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
+ public boolean contains(long deliveryTag) throws AMQException
+ {
+ synchronized (_lock)
+ {
+ return _map.containsKey(deliveryTag);
+ }
+ }
+
public void remove(List<UnacknowledgedMessage> msgs)
{
- synchronized(_lock)
+ synchronized (_lock)
{
for(UnacknowledgedMessage msg : msgs)
{
_map.remove(msg.deliveryTag);
- }
+ }
+ }
+ }
+
+ public UnacknowledgedMessage remove(long deliveryTag)
+ {
+ synchronized (_lock)
+ {
+ return _map.remove(deliveryTag);
+ }
+ }
+
+ public void visit(Visitor visitor) throws AMQException
+ {
+ synchronized (_lock)
+ {
+ Collection<UnacknowledgedMessage> currentEntries = _map.values();
+ for (UnacknowledgedMessage msg: currentEntries)
+ {
+ visitor.callback(msg);
+ }
+ visitor.visitComplete();
+ }
+ }
+
+ public void add(long deliveryTag, UnacknowledgedMessage message)
+ {
+ synchronized( _lock)
+ {
+ _map.put(deliveryTag, message);
+ _lastDeliveryTag = deliveryTag;
+ }
+ }
+
+ public Collection<UnacknowledgedMessage> cancelAllMessages()
+ {
+ synchronized (_lock)
+ {
+ Collection<UnacknowledgedMessage> currentEntries = _map.values();
+ _map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit);
+ return currentEntries;
+ }
+ }
+
+ public void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext)
+ throws AMQException
+ {
+ synchronized (_lock)
+ {
+ txnContext.acknowledgeMessage(deliveryTag, _lastDeliveryTag, multiple, this);
+ }
+ }
+
+ public int size()
+ {
+ synchronized (_lock)
+ {
+ return _map.size();
+ }
+ }
+
+ public void clear()
+ {
+ synchronized (_lock)
+ {
+ _map.clear();
+ }
+ }
+
+ public void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException
+ {
+ synchronized (_lock)
+ {
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = _map.entrySet().iterator();
+ while (it.hasNext())
+ {
+ Map.Entry<Long, UnacknowledgedMessage> unacked = it.next();
+
+ if (unacked.getKey() > deliveryTag)
+ {
+ //This should not occur now.
+ throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
+ " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
+ }
+
+ it.remove();
+
+ destination.add(unacked.getValue());
+ if (unacked.getKey() == deliveryTag)
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ public void resendMessages(AMQProtocolSession protocolSession, int channelId) throws AMQException
+ {
+ synchronized (_lock)
+ {
+ for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+ {
+ long deliveryTag = entry.getKey();
+ String consumerTag = entry.getValue().consumerTag;
+ AMQMessage msg = entry.getValue().message;
+
+ msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag);
+ }
}
}
- private UnacknowledgedMessage get(long key)
+ public UnacknowledgedMessage get(long key)
{
- synchronized(_lock)
+ synchronized (_lock)
{
return _map.get(key);
}
}
+ public Set<Long> getDeliveryTags()
+ {
+ synchronized (_lock)
+ {
+ return _map.keySet();
+ }
+ }
+
private void collect(long key, List<UnacknowledgedMessage> msgs)
{
- synchronized(_lock)
+ synchronized (_lock)
{
for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
{
@@ -76,9 +214,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
if (entry.getKey() == key)
{
break;
- }
+ }
}
}
}
}
-
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index eb9d1acb59..99c08ad200 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.exchange;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.log4j.Logger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -84,8 +84,9 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
final String exchange = payload.getPublishBody().exchange;
final Exchange exch = _exchangeMap.get(exchange);
// there is a small window of opportunity for the exchange to be deleted in between
- // the JmsPublish being received (where the exchange is validated) and the final
+ // the BasicPublish being received (where the exchange is validated) and the final
// content body being received (which triggers this method)
+ // TODO: check where the exchange is validated
if (exch == null)
{
throw new AMQException("Exchange '" + exchange + "' does not exist");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index d4069fa315..7b28161263 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -168,8 +168,7 @@ public class DestNameExchange extends AbstractExchange
public void route(AMQMessage payload) throws AMQException
{
- BasicPublishBody publishBody = payload.getPublishBody();
-
+ final BasicPublishBody publishBody = payload.getPublishBody();
final String routingKey = publishBody.routingKey;
final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
if (queues == null || queues.isEmpty())
@@ -193,7 +192,7 @@ public class DestNameExchange extends AbstractExchange
for (AMQQueue q : queues)
{
- q.deliver(payload);
+ payload.enqueue(q);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index 139307488e..c341f30ab6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -171,7 +171,7 @@ public class DestWildExchange extends AbstractExchange
// TODO: modify code generator to add clone() method then clone the deliver body
// without this addition we have a race condition - we will be modifying the body
// before the encoder has encoded the body for delivery
- q.deliver(payload);
+ payload.enqueue(q);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 824e85dc5c..8ef5f0ab29 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
public interface Exchange
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 229502d2a6..dcb64e2d30 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -145,7 +145,7 @@ public class HeadersExchange extends AbstractExchange
* <attributename>=<value>,<attributename>=<value>,...
* @param queueName
* @param binding
- * @throws JMException
+ * @throws javax.management.JMException
*/
public void createNewBinding(String queueName, String binding) throws JMException
{
@@ -192,7 +192,7 @@ public class HeadersExchange extends AbstractExchange
{
_logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
}
- boolean delivered = false;
+ boolean routed = false;
for (Registration e : _bindings)
{
if (e.binding.matches(headers))
@@ -202,11 +202,11 @@ public class HeadersExchange extends AbstractExchange
_logger.debug("Exchange " + getName() + ": delivering message with headers " +
headers + " to " + e.queue.getName());
}
- e.queue.deliver(payload);
- delivered = true;
+ payload.enqueue(e.queue);
+ routed = true;
}
}
- if (!delivered)
+ if (!routed)
{
String msg = "Exchange " + getName() + ": message not routable.";
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java
index 70b80f65da..7508e80f7f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
/**
* Separated out from the ExchangeRegistry interface to allow components
@@ -33,6 +33,7 @@ public interface MessageRouter
/**
* Routes content through exchanges, delivering it to 1 or more queues.
* @param message the message to be routed
+ *
* @throws org.apache.qpid.AMQException if something goes wrong delivering data
*/
void routeContent(AMQMessage message) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
index 0aa5739c1c..c536f77dde 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
import javax.jms.JMSException;
@@ -58,7 +59,8 @@ public abstract class ArithmeticExpression extends BinaryExpression {
throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue);
}
- public String getExpressionSymbol() {
+ public String getExpressionSymbol()
+ {
return "+";
}
};
@@ -193,7 +195,8 @@ public abstract class ArithmeticExpression extends BinaryExpression {
}
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
Object lvalue = left.evaluate(message);
if (lvalue == null) {
return null;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
index b66de3fbc5..de71e95049 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import javax.jms.JMSException;
@@ -33,13 +34,14 @@ import javax.jms.JMSException;
*
* @version $Revision$
*/
-public interface BooleanExpression extends Expression {
-
+public interface BooleanExpression extends Expression
+{
+
/**
* @param message
* @return true if the expression evaluates to Boolean.TRUE.
* @throws JMSException
*/
- public boolean matches(AMQMessage message) throws JMSException;
+ public boolean matches(AMQMessage message) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
index 13d278cf65..07391098ce 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import java.util.HashSet;
import java.util.List;
@@ -123,7 +124,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B
/**
* org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
*/
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Object rv = this.getRight().evaluate(message);
@@ -139,7 +140,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B
return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
}
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
@@ -199,7 +200,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B
private static BooleanExpression doCreateEqual(Expression left, Expression right) {
return new ComparisonExpression(left, right) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Object lv = left.evaluate(message);
Object rv = right.evaluate(message);
@@ -340,7 +341,8 @@ public abstract class ComparisonExpression extends BinaryExpression implements B
super(left, right);
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
Comparable lv = (Comparable) left.evaluate(message);
if (lv == null) {
return null;
@@ -457,7 +459,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B
protected abstract boolean asBoolean(int answer);
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
index 9bde712da2..2cd305d4b1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import java.math.BigDecimal;
@@ -29,19 +30,24 @@ import javax.jms.JMSException;
/**
* Represents a constant expression
- *
+ *
* @version $Revision$
*/
-public class ConstantExpression implements Expression {
+public class ConstantExpression implements Expression
+{
- static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression {
- public BooleanConstantExpression(Object value) {
+ static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
+ {
+ public BooleanConstantExpression(Object value)
+ {
super(value);
}
- public boolean matches(AMQMessage message) throws JMSException {
+
+ public boolean matches(AMQMessage message) throws AMQException
+ {
Object object = evaluate(message);
- return object!=null && object==Boolean.TRUE;
- }
+ return object != null && object == Boolean.TRUE;
+ }
}
public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
@@ -50,73 +56,92 @@ public class ConstantExpression implements Expression {
private Object value;
- public static ConstantExpression createFromDecimal(String text) {
-
- // Strip off the 'l' or 'L' if needed.
- if( text.endsWith("l") || text.endsWith("L") )
- text = text.substring(0, text.length()-1);
-
- Number value;
- try {
- value = new Long(text);
- } catch ( NumberFormatException e) {
- // The number may be too big to fit in a long.
- value = new BigDecimal(text);
- }
-
+ public static ConstantExpression createFromDecimal(String text)
+ {
+
+ // Strip off the 'l' or 'L' if needed.
+ if (text.endsWith("l") || text.endsWith("L"))
+ {
+ text = text.substring(0, text.length() - 1);
+ }
+
+ Number value;
+ try
+ {
+ value = new Long(text);
+ }
+ catch (NumberFormatException e)
+ {
+ // The number may be too big to fit in a long.
+ value = new BigDecimal(text);
+ }
+
long l = value.longValue();
- if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+ {
value = new Integer(value.intValue());
}
return new ConstantExpression(value);
}
- public static ConstantExpression createFromHex(String text) {
+ public static ConstantExpression createFromHex(String text)
+ {
Number value = new Long(Long.parseLong(text.substring(2), 16));
long l = value.longValue();
- if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+ {
value = new Integer(value.intValue());
}
return new ConstantExpression(value);
}
- public static ConstantExpression createFromOctal(String text) {
+ public static ConstantExpression createFromOctal(String text)
+ {
Number value = new Long(Long.parseLong(text, 8));
long l = value.longValue();
- if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+ {
value = new Integer(value.intValue());
}
return new ConstantExpression(value);
}
- public static ConstantExpression createFloat(String text) {
+ public static ConstantExpression createFloat(String text)
+ {
Number value = new Double(text);
return new ConstantExpression(value);
}
- public ConstantExpression(Object value) {
+ public ConstantExpression(Object value)
+ {
this.value = value;
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
return value;
}
- public Object getValue() {
+ public Object getValue()
+ {
return value;
- }
+ }
/**
* @see java.lang.Object#toString()
*/
- public String toString() {
- if (value == null) {
+ public String toString()
+ {
+ if (value == null)
+ {
return "NULL";
}
- if (value instanceof Boolean) {
+ if (value instanceof Boolean)
+ {
return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE";
}
- if (value instanceof String) {
+ if (value instanceof String)
+ {
return encodeString((String) value);
}
return value.toString();
@@ -127,7 +152,8 @@ public class ConstantExpression implements Expression {
*
* @see java.lang.Object#hashCode()
*/
- public int hashCode() {
+ public int hashCode()
+ {
return toString().hashCode();
}
@@ -136,9 +162,11 @@ public class ConstantExpression implements Expression {
*
* @see java.lang.Object#equals(java.lang.Object)
*/
- public boolean equals(Object o) {
+ public boolean equals(Object o)
+ {
- if (o == null || !this.getClass().equals(o.getClass())) {
+ if (o == null || !this.getClass().equals(o.getClass()))
+ {
return false;
}
return toString().equals(o.toString());
@@ -153,12 +181,15 @@ public class ConstantExpression implements Expression {
* @param s
* @return
*/
- public static String encodeString(String s) {
+ public static String encodeString(String s)
+ {
StringBuffer b = new StringBuffer();
b.append('\'');
- for (int i = 0; i < s.length(); i++) {
+ for (int i = 0; i < s.length(); i++)
+ {
char c = s.charAt(i);
- if (c == '\'') {
+ if (c == '\'')
+ {
b.append(c);
}
b.append(c);
@@ -166,5 +197,5 @@ public class ConstantExpression implements Expression {
b.append('\'');
return b.toString();
}
-
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
index a15c15fb91..3b5debd3ee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import javax.jms.JMSException;
@@ -32,11 +33,12 @@ import javax.jms.JMSException;
*
* @version $Revision$
*/
-public interface Expression {
+public interface Expression
+{
/**
* @return the value of this expression
*/
- public Object evaluate(AMQMessage message) throws JMSException;
+ public Object evaluate(AMQMessage message) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
index 4884067237..5f505fbeba 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -64,7 +64,7 @@ public class JMSSelectorFilter implements MessageFilter
_logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
return match;
}
- catch (JMSException e)
+ catch (AMQException e)
{
//fixme this needs to be sorted.. it shouldn't happen
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
index 714d8c23f5..e6ad98cb8b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import javax.jms.JMSException;
@@ -35,7 +36,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea
public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) {
return new LogicExpression(lvalue, rvalue) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Boolean lv = (Boolean) left.evaluate(message);
// Can we do an OR shortcut??
@@ -56,7 +57,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea
public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) {
return new LogicExpression(lvalue, rvalue) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Boolean lv = (Boolean) left.evaluate(message);
@@ -85,9 +86,9 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea
super(left, right);
}
- abstract public Object evaluate(AMQMessage message) throws JMSException;
+ abstract public Object evaluate(AMQMessage message) throws AMQException;
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
index 283d324ff6..47ca930d12 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
@@ -20,15 +20,9 @@
*/
package org.apache.qpid.server.filter;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.filter.jms.selector.SelectorParser;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.log4j.Logger;
-
-
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
public class NoConsumerFilter implements MessageFilter
{
@@ -36,7 +30,7 @@ public class NoConsumerFilter implements MessageFilter
public NoConsumerFilter() throws AMQException
- {
+ {
_logger.info("Created NoConsumerFilter");
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
index f3e9965c2e..7d6a98df84 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
@@ -47,7 +47,7 @@ public class PropertyExpression implements Expression
interface SubExpression
{
- public Object evaluate(AMQMessage message);
+ public Object evaluate(AMQMessage message) throws AMQException;
}
interface JMSExpression
@@ -65,7 +65,7 @@ public class PropertyExpression implements Expression
}
- public Object evaluate(AMQMessage message)
+ public Object evaluate(AMQMessage message) throws AMQException
{
JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE);
if (msg != null)
@@ -226,7 +226,7 @@ public class PropertyExpression implements Expression
jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name);
}
- public Object evaluate(AMQMessage message) throws JMSException
+ public Object evaluate(AMQMessage message) throws AMQException
{
// try
// {
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
index 49ff147411..abc56f04d0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import java.math.BigDecimal;
import java.util.Collection;
@@ -43,7 +44,8 @@ public abstract class UnaryExpression implements Expression {
public static Expression createNegate(Expression left) {
return new UnaryExpression(left) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
Object rvalue = right.evaluate(message);
if (rvalue == null) {
return null;
@@ -74,7 +76,7 @@ public abstract class UnaryExpression implements Expression {
final Collection inList = t;
return new BooleanUnaryExpression(right) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Object rvalue = right.evaluate(message);
if (rvalue == null) {
@@ -126,7 +128,7 @@ public abstract class UnaryExpression implements Expression {
super(left);
}
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
@@ -135,7 +137,7 @@ public abstract class UnaryExpression implements Expression {
public static BooleanExpression createNOT(BooleanExpression left) {
return new BooleanUnaryExpression(left) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Boolean lvalue = (Boolean) right.evaluate(message);
if (lvalue == null) {
return null;
@@ -159,7 +161,7 @@ public abstract class UnaryExpression implements Expression {
public static BooleanExpression createBooleanCast(Expression left) {
return new BooleanUnaryExpression(left) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Object rvalue = right.evaluate(message);
if (rvalue == null)
return null;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
index ab952b6fea..85402e0781 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
@@ -31,6 +31,7 @@ import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
/**
* Used to evaluate an XPath Expression in a JMS selector.
@@ -75,7 +76,7 @@ public final class XPathExpression implements BooleanExpression {
private final XPathEvaluator evaluator;
static public interface XPathEvaluator {
- public boolean evaluate(AMQMessage message) throws JMSException;
+ public boolean evaluate(AMQMessage message) throws AMQException;
}
XPathExpression(String xpath) {
@@ -97,7 +98,7 @@ public final class XPathExpression implements BooleanExpression {
}
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
// try {
//FIXME this is flow to disk work
// if( message.isDropped() )
@@ -122,7 +123,8 @@ public final class XPathExpression implements BooleanExpression {
* @return true if the expression evaluates to Boolean.TRUE.
* @throws JMSException
*/
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException
+ {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
index 53764cbf75..da8a61650a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
@@ -18,6 +18,7 @@
package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
import javax.jms.JMSException;
//
@@ -35,7 +36,7 @@ public final class XQueryExpression implements BooleanExpression {
this.xpath = xpath;
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
return Boolean.FALSE;
}
@@ -48,7 +49,8 @@ public final class XQueryExpression implements BooleanExpression {
* @return true if the expression evaluates to Boolean.TRUE.
* @throws JMSException
*/
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException
+ {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
index 4b78fd18df..f74e0cedec 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
@@ -34,6 +34,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
//import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.xpath.CachedXPathAPI;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
import org.w3c.dom.Document;
import org.w3c.dom.traversal.NodeIterator;
import org.xml.sax.InputSource;
@@ -46,17 +47,26 @@ public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator {
this.xpath = xpath;
}
- public boolean evaluate(AMQMessage m) throws JMSException {
- if( m instanceof TextMessage ) {
- String text = ((TextMessage)m).getText();
- return evaluate(text);
- } else if ( m instanceof BytesMessage ) {
- BytesMessage bm = (BytesMessage) m;
- byte data[] = new byte[(int) bm.getBodyLength()];
- bm.readBytes(data);
- return evaluate(data);
- }
- return false;
+ public boolean evaluate(AMQMessage m) throws AMQException
+ {
+ try
+ {
+
+ if( m instanceof TextMessage ) {
+ String text = ((TextMessage)m).getText();
+ return evaluate(text);
+ } else if ( m instanceof BytesMessage ) {
+ BytesMessage bm = (BytesMessage) m;
+ byte data[] = new byte[(int) bm.getBodyLength()];
+ bm.readBytes(data);
+ return evaluate(data);
+ }
+ return false;
+ }
+ catch (JMSException e)
+ {
+ throw new AMQException("Error evaluting message: " + e, e);
+ }
}
private boolean evaluate(byte[] data) {
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
index 7fcad5bbf3..c67b86af09 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -49,15 +49,18 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
AMQMethodEvent<TxCommitBody> evt) throws AMQException
{
- try{
+ try
+ {
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.commit();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
- channel.processReturns(protocolSession);
- }catch(AMQException e){
+ channel.processReturns(protocolSession);
+ }
+ catch(AMQException e)
+ {
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
index 7df3825d8a..f4738ff01b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
@@ -47,7 +47,7 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
- protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
+ protocolSession.getChannel(evt.getChannelId()).setLocalTransactional();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
index 72e241ea0a..376f88cbf1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
@@ -24,6 +24,7 @@ import org.apache.qpid.server.message.MessageDecorator;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
import javax.jms.Message;
import javax.jms.JMSException;
@@ -37,7 +38,7 @@ public class JMSMessage implements MessageDecorator
private AMQMessage _message;
private BasicContentHeaderProperties _properties;
- public JMSMessage(AMQMessage message)
+ public JMSMessage(AMQMessage message) throws AMQException
{
_message = message;
ContentHeaderBody contentHeader = message.getContentHeaderBody();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 831117d2c6..08668f0f6a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -40,7 +40,6 @@ import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -201,17 +200,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
else
{
- try
- {
- contentFrameReceived(frame);
- }
- catch (RequiredDeliveryException e)
- {
- //need to return the message:
- _logger.info("Returning message to " + this + " channel " + frame.channel
- + ": " + e.getMessage());
- writeFrame(e.getReturnMessage(frame.channel));
- }
+ contentFrameReceived(frame);
}
}
}
@@ -287,7 +276,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content body frame received: " + frame);
}
- getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame);
+ getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame, this);
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index afe4ea95b9..f30667690f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -21,19 +21,17 @@
package org.apache.qpid.server.queue;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.message.MessageDecorator;
import org.apache.qpid.server.message.jms.JMSMessage;
-import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.HashSet;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,45 +41,28 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class AMQMessage
{
+ private static final Logger _log = Logger.getLogger(AMQMessage.class);
+
public static final String JMS_MESSAGE = "jms.message";
+ /**
+ * Used in clustering
+ */
private final Set<Object> _tokens = new HashSet<Object>();
+ /**
+ * Only use in clustering - should ideally be removed?
+ */
private AMQProtocolSession _publisher;
- private final BasicPublishBody _publishBody;
-
- private ContentHeaderBody _contentHeaderBody;
-
- private List<ContentBody> _contentBodies;
-
- private boolean _redelivered;
-
private final long _messageId;
private final AtomicInteger _referenceCount = new AtomicInteger(1);
- /**
- * Keeps a track of how many bytes we have received in body frames
- */
- private long _bodyLengthReceived = 0;
+ private AMQMessageHandle _messageHandle;
- /**
- * The message store in which this message is contained.
- */
- private transient final MessageStore _store;
-
- /**
- * For non transactional publishes, a message can be stored as
- * soon as it is complete. For transactional messages it doesnt
- * need to be stored until the transaction is committed.
- */
- private boolean _storeWhenComplete;
-
- /**
- * TxnBuffer for transactionally published messages
- */
- private TxnBuffer _txnBuffer;
+ // TODO: ideally this should be able to go into the transient message date - check this! (RG)
+ private TransactionalContext _txnContext;
/**
* Flag to indicate whether message has been delivered to a
@@ -89,178 +70,245 @@ public class AMQMessage
* messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
+
private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
- private AtomicBoolean _taken;
+ private AtomicBoolean _taken = new AtomicBoolean(false);
+ private TransientMessageData _transientMessageData = new TransientMessageData();
- public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
+ /**
+ * Used to iterate through all the body frames associated with this message. Will not
+ * keep all the data in memory therefore is memory-efficient.
+ */
+ private class BodyFrameIterator implements Iterator<AMQDataBlock>
{
- this(messageStore, publishBody, true);
- }
+ private int _channel;
- public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody, boolean storeWhenComplete)
- {
- _messageId = messageStore.getNewMessageId();
- _publishBody = publishBody;
- _store = messageStore;
- _contentBodies = new LinkedList<ContentBody>();
- _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
- _storeWhenComplete = storeWhenComplete;
- _taken = new AtomicBoolean(false);
- }
+ private int _index = -1;
- public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
- throws AMQException
+ private BodyFrameIterator(int channel)
+ {
+ _channel = channel;
+ }
- {
- _publishBody = publishBody;
- _contentHeaderBody = contentHeaderBody;
- _contentBodies = contentBodies;
- _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
- _messageId = messageId;
- _store = store;
- storeMessage();
- }
+ public boolean hasNext()
+ {
+ try
+ {
+ return _index < _messageHandle.getBodyCount(_messageId) - 1;
+ }
+ catch (AMQException e)
+ {
+ _log.error("Unable to get body count: " + e, e);
+ return false;
+ }
+ }
- public AMQMessage(MessageStore store, BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
- throws AMQException
- {
- this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
- }
+ public AMQDataBlock next()
+ {
+ try
+ {
+ ContentBody cb = _messageHandle.getContentBody(_messageId, ++_index);
+ return ContentBody.createAMQFrame(_channel, cb);
+ }
+ catch (AMQException e)
+ {
+ // have no choice but to throw a runtime exception
+ throw new RuntimeException("Error getting content body: " + e, e);
+ }
- protected AMQMessage(AMQMessage msg) throws AMQException
- {
- this(msg._store, msg._messageId, msg._publishBody, msg._contentHeaderBody, msg._contentBodies);
- }
+ }
- public void storeMessage() throws AMQException
- {
- if (isPersistent())
+ public void remove()
{
- _store.put(this);
+ throw new UnsupportedOperationException();
}
}
- public CompositeAMQDataBlock getDataBlock(ByteBuffer encodedDeliverBody, int channel)
+ private class BodyContentIterator implements Iterator<ContentBody>
{
- AMQFrame[] allFrames = new AMQFrame[1 + _contentBodies.size()];
- allFrames[0] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
- for (int i = 1; i < allFrames.length; i++)
+ private int _index = -1;
+
+ public boolean hasNext()
{
- allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 1));
+ try
+ {
+ return _index < _messageHandle.getBodyCount(_messageId) - 1;
+ }
+ catch (AMQException e)
+ {
+ _log.error("Error getting body count: " + e, e);
+ return false;
+ }
}
- return new CompositeAMQDataBlock(encodedDeliverBody, allFrames);
- }
- public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
- {
-
- AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
+ public ContentBody next()
+ {
+ try
+ {
+ return _messageHandle.getContentBody(_messageId, ++_index);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Error getting content body: " + e, e);
+ }
+ }
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- allFrames[0] = BasicDeliverBody.createAMQFrame(channel,
- (byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag, // consumerTag
- deliveryTag, // deliveryTag
- getExchangeName(), // exchange
- _redelivered, // redelivered
- getRoutingKey() // routingKey
- );
- allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
- for (int i = 2; i < allFrames.length; i++)
+ public void remove()
{
- allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2));
+ throw new UnsupportedOperationException();
}
- return new CompositeAMQDataBlock(allFrames);
}
- public List<AMQBody> getPayload()
+ public AMQMessage(long messageId, BasicPublishBody publishBody,
+ TransactionalContext txnContext)
{
- List<AMQBody> payload = new ArrayList<AMQBody>(2 + _contentBodies.size());
- payload.add(_publishBody);
- payload.add(_contentHeaderBody);
- payload.addAll(_contentBodies);
- return payload;
+ _messageId = messageId;
+ _txnContext = txnContext;
+ _transientMessageData.setPublishBody(publishBody);
+ _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
+ _taken = new AtomicBoolean(false);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message created with id " + messageId);
+ }
}
- public BasicPublishBody getPublishBody()
+ /**
+ * Used when recovering, i.e. when the message store is creating references to messages.
+ * In that case, the normal enqueue/routingComplete is not done since the recovery process
+ * is responsible for routing the messages to queues.
+ * @param messageId
+ * @param store
+ * @param factory
+ * @throws AMQException
+ */
+ public AMQMessage(long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException
{
- return _publishBody;
+ _messageId = messageId;
+ _messageHandle = factory.createMessageHandle(messageId, store, true);
+ _transientMessageData = null;
}
- public ContentHeaderBody getContentHeaderBody()
+ /**
+ * Used in testing only. This allows the passing of the content header immediately
+ * on construction.
+ * @param messageId
+ * @param publishBody
+ * @param txnContext
+ * @param contentHeader
+ */
+ public AMQMessage(long messageId, BasicPublishBody publishBody,
+ TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException
{
- return _contentHeaderBody;
+ this(messageId, publishBody, txnContext);
+ setContentHeaderBody(contentHeader);
}
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
- {
- _contentHeaderBody = contentHeaderBody;
- if (_storeWhenComplete && isAllContentReceived())
+ /**
+ * Used in testing only. This allows the passing of the content header and some body fragments on
+ * construction.
+ * @param messageId
+ * @param publishBody
+ * @param txnContext
+ * @param contentHeader
+ * @param destinationQueues
+ * @param contentBodies
+ * @throws AMQException
+ */
+ public AMQMessage(long messageId, BasicPublishBody publishBody,
+ TransactionalContext txnContext,
+ ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
+ List<ContentBody> contentBodies, MessageStore messageStore, StoreContext storeContext,
+ MessageHandleFactory messageHandleFactory) throws AMQException
+ {
+ this(messageId, publishBody, txnContext, contentHeader);
+ _transientMessageData.setDestinationQueues(destinationQueues);
+ routingComplete(messageStore, storeContext, messageHandleFactory);
+ for (ContentBody cb : contentBodies)
{
- storeMessage();
+ addContentBodyFrame(storeContext, cb);
}
}
- public List<ContentBody> getContentBodies()
+ protected AMQMessage(AMQMessage msg) throws AMQException
{
- return _contentBodies;
+ _messageId = msg._messageId;
+ _messageHandle = msg._messageHandle;
+ _txnContext = msg._txnContext;
+ _deliveredToConsumer = msg._deliveredToConsumer;
+ _transientMessageData = msg._transientMessageData;
}
- public void setContentBodies(List<ContentBody> contentBodies)
+ public Iterator<AMQDataBlock> getBodyFrameIterator(int channel)
{
- _contentBodies = contentBodies;
+ return new BodyFrameIterator(channel);
}
- public void addContentBodyFrame(ContentBody contentBody) throws AMQException
+ public Iterator<ContentBody> getContentBodyIterator()
{
- _contentBodies.add(contentBody);
- _bodyLengthReceived += contentBody.getSize();
- if (_storeWhenComplete && isAllContentReceived())
- {
- storeMessage();
- }
+ return new BodyContentIterator();
}
- public boolean isAllContentReceived()
+ public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- return _bodyLengthReceived == _contentHeaderBody.bodySize;
+ if (_transientMessageData != null)
+ {
+ return _transientMessageData.getContentHeaderBody();
+ }
+ else
+ {
+ return _messageHandle.getContentHeaderBody(_messageId);
+ }
}
-
- public boolean isRedelivered()
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+ throws AMQException
{
- return _redelivered;
+ _transientMessageData.setContentHeaderBody(contentHeaderBody);
}
- String getExchangeName()
+ public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) throws AMQException
{
- return _publishBody.exchange;
- }
+ final boolean persistent = isPersistent();
+ _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
+ if (persistent)
+ {
+ _txnContext.beginTranIfNecessary();
+ }
- String getRoutingKey()
- {
- return _publishBody.routingKey;
- }
+ // enqueuing the messages ensure that if required the destinations are recorded to a
+ // persistent store
+ for (AMQQueue q : _transientMessageData.getDestinationQueues())
+ {
+ _messageHandle.enqueue(storeContext, _messageId, q);
+ }
- boolean isImmediate()
- {
- return _publishBody.immediate;
+ if (_transientMessageData.getContentHeaderBody().bodySize == 0)
+ {
+ deliver(storeContext);
+ }
}
- NoConsumersException getNoConsumersException(String queue)
+ public boolean addContentBodyFrame(StoreContext storeContext, ContentBody contentBody) throws AMQException
{
- return new NoConsumersException(queue, _publishBody, _contentHeaderBody, _contentBodies);
+ _transientMessageData.addBodyLength(contentBody.getSize());
+ _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody);
+ if (isAllContentReceived())
+ {
+ deliver(storeContext);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
- public void setRedelivered(boolean redelivered)
+ public boolean isAllContentReceived() throws AMQException
{
- _redelivered = redelivered;
+ return _transientMessageData.isAllContentReceived();
}
public long getMessageId()
@@ -274,13 +322,20 @@ public class AMQMessage
public void incrementReference()
{
_referenceCount.incrementAndGet();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount);
+ }
}
/**
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
+ *
+ * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
*/
- public void decrementReference() throws MessageCleanupException
+ public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
@@ -290,7 +345,16 @@ public class AMQMessage
{
try
{
- _store.removeMessage(_messageId);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ref count on message " + _messageId + " is zero; removing message");
+ }
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ if (_messageHandle != null)
+ {
+ _messageHandle.removeMessage(storeContext, _messageId);
+ }
}
catch (AMQException e)
{
@@ -299,6 +363,17 @@ public class AMQMessage
throw new MessageCleanupException(_messageId, e);
}
}
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId);
+ if (_referenceCount.get() < 0)
+ {
+ Thread.dumpStack();
+ }
+ }
+ }
}
public void setPublisher(AMQProtocolSession publisher)
@@ -311,6 +386,60 @@ public class AMQMessage
return _publisher;
}
+ /**
+ * Called selectors to determin if the message has already been sent
+ * @return _deliveredToConsumer
+ */
+ public boolean getDeliveredToConsumer()
+ {
+ return _deliveredToConsumer;
+ }
+
+
+ public MessageDecorator getDecodedMessage(String type) throws AMQException
+ {
+ MessageDecorator msgtype = null;
+
+ if (_decodedMessages != null)
+ {
+ msgtype = _decodedMessages.get(type);
+
+ if (msgtype == null)
+ {
+ msgtype = decorateMessage(type);
+ }
+ }
+
+ return msgtype;
+ }
+
+ private MessageDecorator decorateMessage(String type) throws AMQException
+ {
+ MessageDecorator msgdec = null;
+
+ if (type.equals(JMS_MESSAGE))
+ {
+ msgdec = new JMSMessage(this);
+ }
+
+ if (msgdec != null)
+ {
+ _decodedMessages.put(type, msgdec);
+ }
+
+ return msgdec;
+ }
+
+ public boolean taken()
+ {
+ return _taken.getAndSet(true);
+ }
+
+ public void release()
+ {
+ _taken.set(false);
+ }
+
public boolean checkToken(Object token)
{
if (_tokens.contains(token))
@@ -324,124 +453,215 @@ public class AMQMessage
}
}
+ /**
+ * Registers a queue to which this message is to be delivered. This is
+ * called from the exchange when it is routing the message. This will be called before any content bodies have
+ * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria.
+ *
+ * @param queue the queue
+ * @throws org.apache.qpid.AMQException if there is an error enqueuing the message
+ */
public void enqueue(AMQQueue queue) throws AMQException
{
- //if the message is not persistent or the queue is not durable
- //we will not need to recover the association and so do not
- //need to record it
- if (isPersistent() && queue.isDurable())
- {
- _store.enqueueMessage(queue.getName(), _messageId);
- }
+ _transientMessageData.addDestinationQueue(queue);
}
- public void dequeue(AMQQueue queue) throws AMQException
+ public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
{
- //only record associations where both queue and message will survive
- //a restart, so only need to remove association if this is the case
- if (isPersistent() && queue.isDurable())
- {
- _store.dequeueMessage(queue.getName(), _messageId);
- }
+ _messageHandle.dequeue(storeContext, _messageId, queue);
}
public boolean isPersistent() throws AMQException
{
- if (_contentHeaderBody == null)
+ if (_transientMessageData != null)
{
- throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
+ return _transientMessageData.isPersistent();
}
+ else
+ {
+ return _messageHandle.isPersistent(_messageId);
+ }
+ }
- //todo remove literal values to a constant file such as AMQConstants in common
- return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
- && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ /**
+ * Called to enforce the 'immediate' flag.
+ *
+ * @throws NoConsumersException if the message is marked for
+ * immediate delivery but has not been marked as delivered to a
+ * consumer
+ */
+ public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
+ {
+ BasicPublishBody pb = getPublishBody();
+ if (pb.immediate && !_deliveredToConsumer)
+ {
+ throw new NoConsumersException(this);
+ }
}
- public void setTxnBuffer(TxnBuffer buffer)
+ public BasicPublishBody getPublishBody() throws AMQException
{
- _txnBuffer = buffer;
+ BasicPublishBody pb;
+ if (_transientMessageData != null)
+ {
+ pb = _transientMessageData.getPublishBody();
+ }
+ else
+ {
+ pb = _messageHandle.getPublishBody(_messageId);
+ }
+ return pb;
}
- public TxnBuffer getTxnBuffer()
+ public boolean isRedelivered()
{
- return _txnBuffer;
+ return _messageHandle.isRedelivered();
}
- /**
- * Called to enforce the 'immediate' flag.
- * @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
- */
- public void checkDeliveredToConsumer() throws NoConsumersException
+ public void setRedelivered(boolean redelivered)
{
- if (isImmediate() && !_deliveredToConsumer)
- {
- throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
- }
+ _messageHandle.setRedelivered(redelivered);
}
/**
* Called when this message is delivered to a consumer. (used to
* implement the 'immediate' flag functionality).
- * And by selectors to determin if the message has already been sent
*/
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
}
- /**
- * Called selectors to determin if the message has already been sent
- * @return _deliveredToConsumer
- */
- public boolean getDeliveredToConsumer()
+ private void deliver(StoreContext storeContext) throws AMQException
{
- return _deliveredToConsumer;
- }
-
+ // we get a reference to the destination queues now so that we can clear the
+ // transient message data as quickly as possible
+ List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
+ try
+ {
+ // first we allow the handle to know that the message has been fully received. This is useful if it is
+ // maintaining any calculated values based on content chunks
+ _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getPublishBody(),
+ _transientMessageData.getContentHeaderBody());
- public MessageDecorator getDecodedMessage(String type)
- {
- MessageDecorator msgtype = null;
+ // we then allow the transactional context to do something with the message content
+ // now that it has all been received, before we attempt delivery
+ _txnContext.messageFullyReceived(isPersistent());
- if (_decodedMessages != null)
- {
- msgtype = _decodedMessages.get(type);
+ _transientMessageData = null;
- if (msgtype == null)
+ for (AMQQueue q : destinationQueues)
{
- msgtype = decorateMessage(type);
+ _txnContext.deliver(this, q);
}
}
-
- return msgtype;
+ finally
+ {
+ destinationQueues.clear();
+ decrementReference(storeContext);
+ }
}
- private MessageDecorator decorateMessage(String type)
+ public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, String consumerTag)
+ throws AMQException
{
- MessageDecorator msgdec = null;
+ ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
- if (type.equals(JMS_MESSAGE))
+ Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ if (bodyFrameIterator.hasNext())
{
- msgdec = new JMSMessage(this);
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver,
+ new AMQDataBlock[]{contentHeader});
+ protocolSession.writeFrame(compositeBlock);
}
- if (msgdec != null)
+ //
+ // Now start writing out the other content bodies
+ //
+ while (bodyFrameIterator.hasNext())
{
- _decodedMessages.put(type, msgdec);
+ protocolSession.writeFrame(bodyFrameIterator.next());
}
- return msgdec;
}
- public boolean taken()
+ private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, String consumerTag)
+ throws AMQException
{
- return _taken.getAndSet(true);
+ BasicPublishBody pb = getPublishBody();
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, (byte) 8, (byte) 0, consumerTag,
+ deliveryTag, pb.exchange, _messageHandle.isRedelivered(),
+ pb.routingKey);
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
}
- public void release()
+ private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText) throws AMQException
{
- _taken.set(false);
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange,
+ replyCode, replyText,
+ getPublishBody().routingKey);
+ ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+ returnFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, String replyText)
+ throws AMQException
+ {
+ ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText);
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+ new AMQDataBlock[]{contentHeader});
+ protocolSession.writeFrame(compositeBlock);
+ }
+
+ //
+ // Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ //
+ while (bodyFrameIterator.hasNext())
+ {
+ protocolSession.writeFrame(bodyFrameIterator.next());
+ }
+ }
+
+ public String toString()
+ {
+ return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
+ _taken;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
new file mode 100644
index 0000000000..ef5d460b9b
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * A pluggable way of getting message data. Implementations can provide intelligent caching for example or
+ * even no caching at all to minimise the broker memory footprint.
+ *
+ * The method all take a messageId to avoid having to store it in the instance - the AMQMessage container
+ * must already keen the messageId so it is pointless storing it twice.
+ */
+public interface AMQMessageHandle
+{
+ ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException;
+
+ /**
+ * @return the number of body frames associated with this message
+ */
+ int getBodyCount(long messageId) throws AMQException;
+
+ /**
+ * @return the size of the body
+ */
+ long getBodySize(long messageId) throws AMQException;
+
+ /**
+ * Get a particular content body
+ * @param index the index of the body to retrieve, must be between 0 and getBodyCount() - 1
+ * @return a content body
+ * @throws IllegalArgumentException if the index is invalid
+ */
+ ContentBody getContentBody(long messageId, int index) throws IllegalArgumentException, AMQException;
+
+ void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException;
+
+ BasicPublishBody getPublishBody(long messageId) throws AMQException;
+
+ boolean isRedelivered();
+
+ void setRedelivered(boolean redelivered);
+
+ boolean isPersistent(long messageId) throws AMQException;
+
+ void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
+ ContentHeaderBody contentHeaderBody)
+ throws AMQException;
+
+ void removeMessage(StoreContext storeContext, long messageId) throws AMQException;
+
+ void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException;
+
+ void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException;
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 101a2833a0..6f1018e753 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,8 +27,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.txn.TxnBuffer;
-import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.store.StoreContext;
import javax.management.JMException;
import java.text.MessageFormat;
@@ -344,17 +343,17 @@ public class AMQQueue implements Managable, Comparable
/**
* Removes the AMQMessage from the top of the queue.
*/
- public void deleteMessageFromTop() throws AMQException
+ public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
- _deliveryMgr.removeAMessageFromTop();
+ _deliveryMgr.removeAMessageFromTop(storeContext);
}
/**
* removes all the messages from the queue.
*/
- public void clearQueue() throws AMQException
+ public void clearQueue(StoreContext storeContext) throws AMQException
{
- _deliveryMgr.clearAllMessages();
+ _deliveryMgr.clearAllMessages(storeContext);
}
public void bind(String routingKey, Exchange exchange)
@@ -378,7 +377,7 @@ public class AMQQueue implements Managable, Comparable
{
if (_deliveryMgr.hasQueuedMessages())
{
- _deliveryMgr.populatePreDeliveryQueue(subscription);
+ _deliveryMgr.populatePreDeliveryQueue(subscription);
}
}
@@ -443,30 +442,9 @@ public class AMQQueue implements Managable, Comparable
delete();
}
- public void deliver(AMQMessage msg) throws AMQException
+ public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
{
- TxnBuffer buffer = msg.getTxnBuffer();
- if (buffer == null)
- {
- //non-transactional
- record(msg);
- process(msg);
- }
- else
- {
- buffer.enlist(new Deliver(msg));
- }
- }
-
- private void record(AMQMessage msg) throws AMQException
- {
- msg.enqueue(this);
- msg.incrementReference();
- }
-
- private void process(AMQMessage msg) throws FailedDequeueException
- {
- _deliveryMgr.deliver(getName(), msg);
+ _deliveryMgr.deliver(storeContext, getName(), msg);
try
{
msg.checkDeliveredToConsumer();
@@ -476,16 +454,16 @@ public class AMQQueue implements Managable, Comparable
{
// as this message will be returned, it should be removed
// from the queue:
- dequeue(msg);
+ dequeue(storeContext, msg);
}
}
- void dequeue(AMQMessage msg) throws FailedDequeueException
+ void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
{
try
{
- msg.dequeue(this);
- msg.decrementReference();
+ msg.dequeue(storeContext, this);
+ msg.decrementReference(storeContext);
}
catch (MessageCleanupException e)
{
@@ -511,10 +489,17 @@ public class AMQQueue implements Managable, Comparable
return _subscribers;
}
- protected void updateReceivedMessageCount(AMQMessage msg)
+ protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
{
_totalMessagesReceived++;
- _managedObject.checkForNotification(msg);
+ try
+ {
+ _managedObject.checkForNotification(msg);
+ }
+ catch (JMException e)
+ {
+ throw new AMQException("Unable to get notification from manage queue: " + e, e);
+ }
}
public boolean equals(Object o)
@@ -550,45 +535,4 @@ public class AMQQueue implements Managable, Comparable
_logger.debug(MessageFormat.format(msg, args));
}
}
-
- private class Deliver implements TxnOp
- {
- private final AMQMessage _msg;
-
- Deliver(AMQMessage msg)
- {
- _msg = msg;
- }
-
- public void prepare() throws AMQException
- {
- //do the persistent part of the record()
- _msg.enqueue(AMQQueue.this);
- }
-
- public void undoPrepare()
- {
- }
-
- public void commit()
- {
- //do the memeory part of the record()
- _msg.incrementReference();
- //then process the message
- try
- {
- process(_msg);
- }
- catch (FailedDequeueException e)
- {
- //TODO: is there anything else we can do here? I think not...
- _logger.error("Error during commit of a queue delivery: " + e, e);
- }
- }
-
- public void rollback()
- {
- }
- }
-
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 1bdf265a1b..77bbdf7b4b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -20,6 +20,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -35,6 +36,7 @@ import javax.management.OperationsException;
import javax.management.monitor.MonitorNotification;
import java.util.List;
import java.util.ArrayList;
+import java.util.Iterator;
/**
* MBean class for AMQQueue. It implements all the management features exposed
@@ -43,6 +45,12 @@ import java.util.ArrayList;
@MBeanDescription("Management Interface for AMQQueue")
public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
{
+ /**
+ * Since the MBean is not associated with a real channel we can safely create our own store context
+ * for use in the few methods that require one.
+ */
+ private StoreContext _storeContext = new StoreContext();
+
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
@@ -165,7 +173,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
/**
* returns the size of messages(KB) in the queue.
*/
- public Long getQueueDepth()
+ public Long getQueueDepth() throws JMException
{
List<AMQMessage> list = _queue.getMessagesOnTheQueue();
if (list.size() == 0)
@@ -174,9 +182,16 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
}
long queueDepth = 0;
- for (AMQMessage message : list)
+ try
{
- queueDepth = queueDepth + getMessageSize(message);
+ for (AMQMessage message : list)
+ {
+ queueDepth = queueDepth + getMessageSize(message);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new JMException("Unable to get message size: " + e);
}
return (long) Math.round(queueDepth / 1000);
}
@@ -184,7 +199,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
/**
* returns size of message in bytes
*/
- private long getMessageSize(AMQMessage msg)
+ private long getMessageSize(AMQMessage msg) throws AMQException
{
if (msg == null)
{
@@ -197,7 +212,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
/**
* Checks if there is any notification to be send to the listeners
*/
- public void checkForNotification(AMQMessage msg)
+ public void checkForNotification(AMQMessage msg) throws AMQException, JMException
{
// Check for threshold message count
Integer msgCount = getMessageCount();
@@ -233,13 +248,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
}
/**
- * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop()
+ * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop
*/
public void deleteMessageFromTop() throws JMException
{
try
{
- _queue.deleteMessageFromTop();
+ _queue.deleteMessageFromTop(_storeContext);
}
catch (AMQException ex)
{
@@ -248,13 +263,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
}
/**
- * @see org.apache.qpid.server.queue.AMQQueue#clearQueue()
+ * @see org.apache.qpid.server.queue.AMQQueue#clearQueue
*/
public void clearQueue() throws JMException
{
try
{
- _queue.clearQueue();
+ _queue.clearQueue(_storeContext);
}
catch (AMQException ex)
{
@@ -273,11 +288,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
// get message content
- List<ContentBody> cBodies = msg.getContentBodies();
+ Iterator<ContentBody> cBodies = msg.getContentBodyIterator();
List<Byte> msgContent = new ArrayList<Byte>();
- if (cBodies != null)
+ while (cBodies.hasNext())
{
- for (ContentBody body : cBodies)
+ ContentBody body = cBodies.next();
+ if (body.getSize() != 0)
{
if (body.getSize() != 0)
{
@@ -290,17 +306,23 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
}
}
- // Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
+ try
{
- mimeType = headerProperties.getContentType();
- encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
+ String mimeType = null, encoding = null;
+ if (headerProperties != null)
+ {
+ mimeType = headerProperties.getContentType();
+ encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+ }
+ Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
+ return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
+ }
+ catch (AMQException e)
+ {
+ throw new JMException("Error creating header attributes list: " + e);
}
- Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
-
- return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
}
/**
@@ -317,17 +339,24 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
List<AMQMessage> list = _queue.getMessagesOnTheQueue();
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
- // Create the tabular list of message header contents
- for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
+ try
{
- AMQMessage msg = list.get(i - 1);
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
- String[] headerAttributes = headerProperties.toString().split(",");
- Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
- CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
- _messageList.put(messageData);
+ // Create the tabular list of message header contents
+ for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
+ {
+ AMQMessage msg = list.get(i - 1);
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ String[] headerAttributes = headerProperties.toString().split(",");
+ Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
+ _messageList.put(messageData);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new JMException("Error creating message contents: " + e);
}
return _messageList;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
index 022d3b9635..a2898ccdce 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,18 +22,17 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
/**
@@ -46,16 +45,19 @@ public class ConcurrentDeliveryManager implements DeliveryManager
@Configured(path = "advanced.compressBufferOnQueue",
defaultValue = "false")
public boolean compressBufferOnQueue;
+
/**
* Holds any queued messages
*/
private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
//private int _messageCount;
/**
* Ensures that only one asynchronous task is running for this manager at
* any time.
*/
private final AtomicBoolean _processing = new AtomicBoolean();
+
/**
* The subscriptions on the queue to whom messages are delivered
*/
@@ -67,7 +69,6 @@ public class ConcurrentDeliveryManager implements DeliveryManager
*/
private final AMQQueue _queue;
-
/**
* Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
* to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
@@ -77,7 +78,6 @@ public class ConcurrentDeliveryManager implements DeliveryManager
*/
private ReentrantLock _lock = new ReentrantLock();
-
ConcurrentDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -101,14 +101,13 @@ public class ConcurrentDeliveryManager implements DeliveryManager
return hasQueuedMessages();
}
-
/**
* @param msg to enqueue
* @return true if we are queue this message
*/
- private boolean enqueue(AMQMessage msg)
+ private boolean enqueue(AMQMessage msg) throws AMQException
{
- if (msg.isImmediate())
+ if (msg.getPublishBody().immediate)
{
return false;
}
@@ -133,9 +132,9 @@ public class ConcurrentDeliveryManager implements DeliveryManager
}
}
- private void startQueueing(AMQMessage msg)
+ private void startQueueing(AMQMessage msg) throws AMQException
{
- if (!msg.isImmediate())
+ if (!msg.getPublishBody().immediate)
{
addMessageToQueue(msg);
}
@@ -143,7 +142,10 @@ public class ConcurrentDeliveryManager implements DeliveryManager
private boolean addMessageToQueue(AMQMessage msg)
{
- // Shrink the ContentBodies to their actual size to save memory.
+ // Shrink the ContentBodies to their actual size to save memory.
+ /* TODO need to reimplement this - probably not in this class though
+ * for obvious reasons
+
if (compressBufferOnQueue)
{
Iterator it = msg.getContentBodies().iterator();
@@ -153,16 +155,14 @@ public class ConcurrentDeliveryManager implements DeliveryManager
cb.reduceBufferToFit();
}
}
-
+ */
_messages.offer(msg);
return true;
}
-
public boolean hasQueuedMessages()
{
-
_lock.lock();
try
{
@@ -172,8 +172,6 @@ public class ConcurrentDeliveryManager implements DeliveryManager
{
_lock.unlock();
}
-
-
}
public int getQueueMessageCount()
@@ -203,21 +201,21 @@ public class ConcurrentDeliveryManager implements DeliveryManager
//no-op . This DM has no PreDeliveryQueues
}
- public synchronized void removeAMessageFromTop() throws AMQException
+ public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
if (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
}
}
- public synchronized void clearAllMessages() throws AMQException
+ public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
while (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
msg = poll();
}
}
@@ -226,7 +224,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager
* Only one thread should ever execute this method concurrently, but
* it can do so while other threads invoke deliver().
*/
- private void processQueue()
+ private void processQueue() throws AMQException
{
try
{
@@ -296,7 +294,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager
}
}
- public void deliver(String name, AMQMessage msg) throws FailedDequeueException
+ public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException
{
// first check whether we are queueing, and enqueue if we are
if (!enqueue(msg))
@@ -308,7 +306,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager
Subscription s = _subscriptions.nextSubscriber(msg);
if (s == null)
{
- if (!msg.isImmediate())
+ if (!msg.getPublishBody().immediate)
{
// no subscribers yet so enter 'queueing' mode and queue this message
startQueueing(msg);
@@ -333,7 +331,18 @@ public class ConcurrentDeliveryManager implements DeliveryManager
boolean running = true;
while (running)
{
- processQueue();
+ try
+ {
+ processQueue();
+ }
+ catch (AMQException e)
+ {
+ _log.error("Error processing queue: " + e, e);
+ _log.error("Delivery manager terminating.");
+ running = false;
+ _processing.set(false);
+ break;
+ }
//Check that messages have not been added since we did our last peek();
// Synchronize with the thread that adds to the queue.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index f09e8213b1..8f0c3a5ec7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -26,6 +26,7 @@ import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.store.StoreContext;
import java.util.ArrayList;
import java.util.Iterator;
@@ -99,10 +100,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
{
- Iterator it = msg.getContentBodies().iterator();
+ Iterator<ContentBody> it = msg.getContentBodyIterator();
while (it.hasNext())
{
- ContentBody cb = (ContentBody) it.next();
+ ContentBody cb = it.next();
cb.reduceBufferToFit();
}
}
@@ -167,21 +168,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- public synchronized void removeAMessageFromTop() throws AMQException
+ public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
if (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
}
}
- public synchronized void clearAllMessages() throws AMQException
+ public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
while (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
msg = poll();
}
}
@@ -223,7 +224,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//remove sent message from our queue.
messageQueue.poll();
}
- catch (FailedDequeueException e)
+ catch (AMQException e)
{
message.release();
_log.error("Unable to deliver message as dequeue failed: " + e, e);
@@ -279,11 +280,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return _messages.poll();
}
- public void deliver(String name, AMQMessage msg) throws FailedDequeueException
+ public void deliver(StoreContext context, String name, AMQMessage msg) throws AMQException
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "deliver :" + System.identityHashCode(msg));
+ _log.debug(id() + "deliver :" + msg);
}
//Check if we have someone to deliver the message to.
@@ -296,9 +297,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
+ _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery");
}
- if (!msg.isImmediate())
+ if (!msg.getPublishBody().immediate)
{
addMessageToQueue(msg);
@@ -308,7 +309,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//Pre Deliver to all subscriptions
if (_log.isDebugEnabled())
{
- _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
+ _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
" subscribers to give the message to.");
}
for (Subscription sub : _subscriptions.getSubscriptions())
@@ -330,7 +331,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
+ _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
sub.enqueueForPreDelivery(msg);
@@ -345,7 +346,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
System.identityHashCode(s) + ") :" + s);
}
//Deliver the message
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index cac499587f..82d8f9538f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
import java.util.concurrent.Executor;
import java.util.List;
@@ -66,11 +67,11 @@ interface DeliveryManager
* @param msg the message to deliver
* @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued
*/
- void deliver(String name, AMQMessage msg) throws FailedDequeueException;
+ void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException;
- void removeAMessageFromTop() throws AMQException;
+ void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
- void clearAllMessages() throws AMQException;
+ void clearAllMessages(StoreContext storeContext) throws AMQException;
List<AMQMessage> getMessages();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
new file mode 100644
index 0000000000..e2758ce6cb
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ */
+public class InMemoryMessageHandle implements AMQMessageHandle
+{
+
+ private ContentHeaderBody _contentHeaderBody;
+
+ private BasicPublishBody _publishBody;
+
+ private List<ContentBody> _contentBodies = new LinkedList<ContentBody>();
+
+ private boolean _redelivered;
+
+ public InMemoryMessageHandle()
+ {
+ }
+
+ public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException
+ {
+ return _contentHeaderBody;
+ }
+
+ public int getBodyCount(long messageId)
+ {
+ return _contentBodies.size();
+ }
+
+ public long getBodySize(long messageId) throws AMQException
+ {
+ return getContentHeaderBody(messageId).bodySize;
+ }
+
+ public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException
+ {
+ if (index > _contentBodies.size() - 1)
+ {
+ throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
+ (_contentBodies.size() - 1));
+ }
+ return _contentBodies.get(index);
+ }
+
+ public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody)
+ throws AMQException
+ {
+ _contentBodies.add(contentBody);
+ }
+
+ public BasicPublishBody getPublishBody(long messageId) throws AMQException
+ {
+ return _publishBody;
+ }
+
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
+
+
+ public void setRedelivered(boolean redelivered)
+ {
+ _redelivered = redelivered;
+ }
+
+ public boolean isPersistent(long messageId) throws AMQException
+ {
+ //todo remove literal values to a constant file such as AMQConstants in common
+ ContentHeaderBody chb = getContentHeaderBody(messageId);
+ return chb.properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+ }
+
+ /**
+ * This is called when all the content has been received.
+ * @param publishBody
+ * @param contentHeaderBody
+ * @throws AMQException
+ */
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
+ ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+ _publishBody = publishBody;
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public void removeMessage(StoreContext storeContext, long messageId) throws AMQException
+ {
+ // NO OP
+ }
+
+ public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+ {
+ // NO OP
+ }
+
+ public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+ {
+ // NO OP
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
index de5d0f55a7..c36cc6bd7b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,11 +23,12 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.management.MBeanAttribute;
import org.apache.qpid.server.management.MBeanOperation;
import org.apache.qpid.server.management.MBeanOperationParameter;
+import org.apache.qpid.AMQException;
import javax.management.JMException;
import javax.management.MBeanOperationInfo;
-import javax.management.openmbean.TabularData;
import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
import java.io.IOException;
/**
@@ -70,7 +71,7 @@ public interface ManagedQueue
* @throws IOException
*/
@MBeanAttribute(name="QueueDepth", description="Size of messages(KB) in the queue")
- Long getQueueDepth() throws IOException;
+ Long getQueueDepth() throws IOException, JMException;
/**
* Returns the total number of active subscribers to the queue.
@@ -184,7 +185,7 @@ public interface ManagedQueue
description="Message headers for messages in this queue within given index range. eg. from index 1 - 100")
TabularData viewMessages(@MBeanOperationParameter(name="from index", description="from index")int fromIndex,
@MBeanOperationParameter(name="to index", description="to index")int toIndex)
- throws IOException, JMException;
+ throws IOException, JMException, AMQException;
@MBeanOperation(name="viewMessageContent", description="The message content for given Message Id")
CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
new file mode 100644
index 0000000000..a7ada2c1f8
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.store.MessageStore;
+
+/**
+ * Constructs a message handle based on the publish body, the content header and the queue to which the message
+ * has been routed.
+ *
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class MessageHandleFactory
+{
+
+ public AMQMessageHandle createMessageHandle(long messageId, MessageStore store, boolean persistent)
+ {
+ // just hardcoded for now
+ if (persistent)
+ {
+ return new WeakReferenceMessageHandle(store);
+ }
+ else
+ {
+ return new InMemoryMessageHandle();
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
new file mode 100644
index 0000000000..deed18c188
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * Encapsulates a publish body and a content header. In the context of the message store these are treated as a
+ * single unit.
+ */
+public class MessageMetaData
+{
+ private BasicPublishBody _publishBody;
+
+ private ContentHeaderBody _contentHeaderBody;
+
+ private int _contentChunkCount;
+
+ public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+ {
+ _contentHeaderBody = contentHeaderBody;
+ _publishBody = publishBody;
+ _contentChunkCount = contentChunkCount;
+ }
+
+ public int getContentChunkCount()
+ {
+ return _contentChunkCount;
+ }
+
+ public void setContentChunkCount(int contentChunkCount)
+ {
+ _contentChunkCount = contentChunkCount;
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+ {
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public BasicPublishBody getPublishBody()
+ {
+ return _publishBody;
+ }
+
+ public void setPublishBody(BasicPublishBody publishBody)
+ {
+ _publishBody = publishBody;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
index 2d37b806f6..2049189e0f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
@@ -20,13 +20,8 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.protocol.AMQConstant;
-
-import java.util.List;
+import org.apache.qpid.server.RequiredDeliveryException;
/**
* Signals that no consumers exist for a message at a given point in time.
@@ -35,19 +30,9 @@ import java.util.List;
*/
public class NoConsumersException extends RequiredDeliveryException
{
- public NoConsumersException(String queue,
- BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
- {
- super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
- }
-
- public NoConsumersException(BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
+ public NoConsumersException(AMQMessage message)
{
- super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies);
+ super("Immediate delivery is not possible.", message);
}
public int getReplyCode()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index a5672f2b19..2dab551e07 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,11 +26,11 @@ import java.util.Queue;
public interface Subscription
{
- void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
+ void send(AMQMessage msg, AMQQueue queue) throws AMQException;
boolean isSuspended();
- void queueDeleted(AMQQueue queue);
+ void queueDeleted(AMQQueue queue) throws AMQException;
boolean hasFilters();
@@ -44,5 +44,5 @@ public interface Subscription
void close();
- boolean isBrowser();
+ boolean isBrowser();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 78310e8eb3..0dc1f3b0c1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,18 +23,18 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import java.util.Queue;
@@ -201,7 +201,7 @@ public class SubscriptionImpl implements Subscription
* @param queue
* @throws AMQException
*/
- public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ public void send(AMQMessage msg, AMQQueue queue) throws AMQException
{
if (msg != null)
{
@@ -211,7 +211,7 @@ public class SubscriptionImpl implements Subscription
}
else
{
- sendToConsumer(msg, queue);
+ sendToConsumer(channel.getStoreContext(), msg, queue);
}
}
else
@@ -220,7 +220,7 @@ public class SubscriptionImpl implements Subscription
}
}
- private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -235,14 +235,12 @@ public class SubscriptionImpl implements Subscription
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
-
- protocolSession.writeFrame(frame);
+ msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
}
}
- private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
+ throws AMQException
{
try
{
@@ -257,7 +255,11 @@ public class SubscriptionImpl implements Subscription
// the message is unacked, it will be lost.
if (!_acks)
{
- queue.dequeue(msg);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ }
+ queue.dequeue(storeContext, msg);
}
synchronized(channel)
{
@@ -268,10 +270,7 @@ public class SubscriptionImpl implements Subscription
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
-
- protocolSession.writeFrame(frame);
+ msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
}
}
finally
@@ -290,7 +289,7 @@ public class SubscriptionImpl implements Subscription
*
* @param queue
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(AMQQueue queue) throws AMQException
{
channel.queueDeleted(queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 91e720ea54..8272202571 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -204,7 +204,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
*
* @param queue
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(AMQQueue queue) throws AMQException
{
for (Subscription s : _subscriptions)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
index c967ea2cde..f290452058 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.log4j.Logger;
import java.util.LinkedList;
@@ -41,11 +42,13 @@ class SynchronizedDeliveryManager implements DeliveryManager
* Holds any queued messages
*/
private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>();
+
/**
* Ensures that only one asynchronous task is running for this manager at
* any time.
*/
private final AtomicBoolean _processing = new AtomicBoolean();
+
/**
* The subscriptions on the queue to whom messages are delivered
*/
@@ -70,9 +73,9 @@ class SynchronizedDeliveryManager implements DeliveryManager
_queue = queue;
}
- private synchronized boolean enqueue(AMQMessage msg)
+ private synchronized boolean enqueue(AMQMessage msg) throws AMQException
{
- if (msg.isImmediate())
+ if (msg.getPublishBody().immediate)
{
return false;
}
@@ -90,7 +93,7 @@ class SynchronizedDeliveryManager implements DeliveryManager
}
}
- private synchronized void startQueueing(AMQMessage msg)
+ private synchronized void startQueueing(AMQMessage msg) throws AMQException
{
_queueing = true;
enqueue(msg);
@@ -127,21 +130,21 @@ class SynchronizedDeliveryManager implements DeliveryManager
//no-op . This DM has no PreDeliveryQueues
}
- public synchronized void removeAMessageFromTop() throws AMQException
+ public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
if (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
}
}
- public synchronized void clearAllMessages() throws AMQException
+ public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
while (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
msg = poll();
}
}
@@ -231,7 +234,7 @@ class SynchronizedDeliveryManager implements DeliveryManager
* @throws NoConsumersException if there are no active subscribers to deliver
* the message to
*/
- public void deliver(String name, AMQMessage msg) throws FailedDequeueException
+ public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException
{
// first check whether we are queueing, and enqueue if we are
if (!enqueue(msg))
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
new file mode 100644
index 0000000000..0c50dc5207
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Contains data that is only used in AMQMessage transiently, e.g. while the content
+ * body fragments are arriving.
+ *
+ * Having this data stored in a separate class means that the AMQMessage class avoids
+ * the small overhead of numerous guaranteed-null references.
+ *
+ * @author Apache Software Foundation
+ */
+public class TransientMessageData
+{
+ /**
+ * Stored temporarily until the header has been received at which point it is used when
+ * constructing the handle
+ */
+ private BasicPublishBody _publishBody;
+
+ /**
+ * Also stored temporarily.
+ */
+ private ContentHeaderBody _contentHeaderBody;
+
+ /**
+ * Keeps a track of how many bytes we have received in body frames
+ */
+ private long _bodyLengthReceived = 0;
+
+ /**
+ * This is stored during routing, to know the queues to which this message should immediately be
+ * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
+ * by the message handle.
+ */
+ private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+
+ public BasicPublishBody getPublishBody()
+ {
+ return _publishBody;
+ }
+
+ public void setPublishBody(BasicPublishBody publishBody)
+ {
+ _publishBody = publishBody;
+ }
+
+ public List<AMQQueue> getDestinationQueues()
+ {
+ return _destinationQueues;
+ }
+
+ public void setDestinationQueues(List<AMQQueue> destinationQueues)
+ {
+ _destinationQueues = destinationQueues;
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+ {
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public long getBodyLengthReceived()
+ {
+ return _bodyLengthReceived;
+ }
+
+ public void addBodyLength(int value)
+ {
+ _bodyLengthReceived += value;
+ }
+
+ public boolean isAllContentReceived() throws AMQException
+ {
+ return _bodyLengthReceived == _contentHeaderBody.bodySize;
+ }
+
+ public void addDestinationQueue(AMQQueue queue)
+ {
+ _destinationQueues.add(queue);
+ }
+
+ public boolean isPersistent()
+ {
+ //todo remove literal values to a constant file such as AMQConstants in common
+ return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
new file mode 100644
index 0000000000..2fb2bdd2e3
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class WeakReferenceMessageHandle implements AMQMessageHandle
+{
+ private WeakReference<ContentHeaderBody> _contentHeaderBody;
+
+ private WeakReference<BasicPublishBody> _publishBody;
+
+ private List<WeakReference<ContentBody>> _contentBodies;
+
+ private boolean _redelivered;
+
+ private final MessageStore _messageStore;
+
+ public WeakReferenceMessageHandle(MessageStore messageStore)
+ {
+ _messageStore = messageStore;
+ }
+
+ public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException
+ {
+ ContentHeaderBody chb = (_contentHeaderBody != null?_contentHeaderBody.get():null);
+ if (chb == null)
+ {
+ MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ chb = mmd.getContentHeaderBody();
+ _contentHeaderBody = new WeakReference<ContentHeaderBody>(chb);
+ _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
+ }
+ return chb;
+ }
+
+ public int getBodyCount(long messageId) throws AMQException
+ {
+ if (_contentBodies == null)
+ {
+ MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ int chunkCount = mmd.getContentChunkCount();
+ _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount);
+ for (int i = 0; i < chunkCount; i++)
+ {
+ _contentBodies.add(new WeakReference<ContentBody>(null));
+ }
+ }
+ return _contentBodies.size();
+ }
+
+ public long getBodySize(long messageId) throws AMQException
+ {
+ return getContentHeaderBody(messageId).bodySize;
+ }
+
+ public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException
+ {
+ if (index > _contentBodies.size() - 1)
+ {
+ throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
+ (_contentBodies.size() - 1));
+ }
+ WeakReference<ContentBody> wr = _contentBodies.get(index);
+ ContentBody cb = wr.get();
+ if (cb == null)
+ {
+ cb = _messageStore.getContentBodyChunk(messageId, index);
+ _contentBodies.set(index, new WeakReference<ContentBody>(cb));
+ }
+ return cb;
+ }
+
+ /**
+ * Content bodies are set <i>before</i> the publish and header frames
+ * @param storeContext
+ * @param messageId
+ * @param contentBody
+ * @throws AMQException
+ */
+ public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException
+ {
+ if (_contentBodies == null)
+ {
+ _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ }
+ _contentBodies.add(new WeakReference<ContentBody>(contentBody));
+ _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody);
+ }
+
+ public BasicPublishBody getPublishBody(long messageId) throws AMQException
+ {
+ BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null);
+ if (bpb == null)
+ {
+ MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ bpb = mmd.getPublishBody();
+ _publishBody = new WeakReference<BasicPublishBody>(bpb);
+ _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
+ }
+ return bpb;
+ }
+
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
+
+ public void setRedelivered(boolean redelivered)
+ {
+ _redelivered = redelivered;
+ }
+
+ public boolean isPersistent(long messageId) throws AMQException
+ {
+ //todo remove literal values to a constant file such as AMQConstants in common
+ ContentHeaderBody chb = getContentHeaderBody(messageId);
+ return chb.properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+ }
+
+ /**
+ * This is called when all the content has been received.
+ * @param publishBody
+ * @param contentHeaderBody
+ * @throws AMQException
+ */
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
+ ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+ // if there are no content bodies the list will be null so we must
+ // create en empty list here
+ if (contentHeaderBody.bodySize == 0)
+ {
+ _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ }
+ _messageStore.storeMessageMetaData(storeContext, messageId, new MessageMetaData(publishBody, contentHeaderBody,
+ _contentBodies.size()));
+ _publishBody = new WeakReference<BasicPublishBody>(publishBody);
+ _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody);
+ }
+
+ public void removeMessage(StoreContext storeContext, long messageId) throws AMQException
+ {
+ _messageStore.removeMessage(storeContext, messageId);
+ }
+
+ public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+ {
+ _messageStore.enqueueMessage(storeContext, queue.getName(), messageId);
+ }
+
+ public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+ {
+ _messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 328aed81d9..edf2386314 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,10 +23,12 @@ package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -43,21 +45,25 @@ public class MemoryMessageStore implements MessageStore
private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
- protected ConcurrentMap<Long, AMQMessage> _messageMap;
+ protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
+
+ protected ConcurrentMap<Long, List<ContentBody>> _contentBodyMap;
private final AtomicLong _messageId = new AtomicLong(1);
public void configure()
{
- _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table");
- _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
+ _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(DEFAULT_HASHTABLE_CAPACITY);
}
public void configure(String base, Configuration config)
{
int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
- _log.info("Using capacity " + hashtableCapacity + " for hash table");
- _messageMap = new ConcurrentHashMap<Long, AMQMessage>(hashtableCapacity);
+ _log.info("Using capacity " + hashtableCapacity + " for hash tables");
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity);
}
public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
@@ -67,70 +73,71 @@ public class MemoryMessageStore implements MessageStore
public void close() throws Exception
{
- if (_messageMap != null)
+ if (_metaDataMap != null)
{
- _messageMap.clear();
- _messageMap = null;
+ _metaDataMap.clear();
+ _metaDataMap = null;
+ }
+ if (_contentBodyMap != null)
+ {
+ _contentBodyMap.clear();
+ _contentBodyMap = null;
}
}
- public void put(AMQMessage msg)
- {
- _messageMap.put(msg.getMessageId(), msg);
- }
-
- public void removeMessage(long messageId)
+ public void removeMessage(StoreContext context, long messageId)
{
if (_log.isDebugEnabled())
{
_log.debug("Removing message with id " + messageId);
}
- _messageMap.remove(messageId);
+ _metaDataMap.remove(messageId);
+ _contentBodyMap.remove(messageId);
}
public void createQueue(AMQQueue queue) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
public void removeQueue(String name) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public void enqueueMessage(String name, long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public void dequeueMessage(String name, long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public void beginTran() throws AMQException
+ public void beginTran(StoreContext context) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public void commitTran() throws AMQException
+ public void commitTran(StoreContext context) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public void abortTran() throws AMQException
+ public void abortTran(StoreContext context) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public boolean inTran()
+ public boolean inTran(StoreContext context)
{
return false;
}
public List<AMQQueue> createQueues() throws AMQException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public long getNewMessageId()
@@ -138,8 +145,33 @@ public class MemoryMessageStore implements MessageStore
return _messageId.getAndIncrement();
}
- public AMQMessage getMessage(long messageId)
+ public void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody)
+ throws AMQException
+ {
+ List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+ if (bodyList == null)
+ {
+ bodyList = new ArrayList<ContentBody>();
+ _contentBodyMap.put(messageId, bodyList);
+ }
+
+ bodyList.add(index, contentBody);
+ }
+
+ public void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData)
+ throws AMQException
+ {
+ _metaDataMap.put(messageId, messageMetaData);
+ }
+
+ public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+ {
+ return _metaDataMap.get(messageId);
+ }
+
+ public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
{
- return _messageMap.get(messageId);
+ List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+ return bodyList.get(index);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 8ee1d09862..973c661c06 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,8 +22,9 @@ package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
import java.util.List;
@@ -37,34 +38,33 @@ public interface MessageStore
* @param base the base element identifier from which all configuration items are relative. For example, if the base
* element is "store", the all elements used by concrete classes will be "store.foo" etc.
* @param config the apache commons configuration object
+ * @throws Exception if an error occurs that means the store is unable to configure itself
*/
void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
- * @throws Exception
+ * @throws Exception if close fails
*/
void close() throws Exception;
- void put(AMQMessage msg) throws AMQException;
-
- void removeMessage(long messageId) throws AMQException;
+ void removeMessage(StoreContext storeContext, long messageId) throws AMQException;
void createQueue(AMQQueue queue) throws AMQException;
void removeQueue(String name) throws AMQException;
- void enqueueMessage(String name, long messageId) throws AMQException;
+ void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException;
- void dequeueMessage(String name, long messageId) throws AMQException;
+ void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException;
- void beginTran() throws AMQException;
+ void beginTran(StoreContext context) throws AMQException;
- void commitTran() throws AMQException;
+ void commitTran(StoreContext context) throws AMQException;
- void abortTran() throws AMQException;
+ void abortTran(StoreContext context) throws AMQException;
- boolean inTran();
+ boolean inTran(StoreContext context);
/**
* Recreate all queues that were persisted, including re-enqueuing of existing messages
@@ -78,6 +78,13 @@ public interface MessageStore
* @return a message id
*/
long getNewMessageId();
-}
+ void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) throws AMQException;
+ void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) throws AMQException;
+
+ MessageMetaData getMessageMetaData(long messageId) throws AMQException;
+
+ ContentBody getContentBodyChunk(long messageId, int index) throws AMQException;
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
new file mode 100644
index 0000000000..55e5067852
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+/**
+ * A context that the store can use to associate with a transactional context. For example, it could store
+ * some kind of txn id.
+ *
+ * @author Apache Software Foundation
+ */
+public class StoreContext
+{
+ private Object _payload;
+
+ public Object getPayload()
+ {
+ return _payload;
+ }
+
+ public void setPayload(Object payload)
+ {
+ _payload = payload;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
new file mode 100644
index 0000000000..6d5d3c42f3
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.List;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CleanupMessageOperation implements TxnOp
+{
+ private static final Logger _log = Logger.getLogger(CleanupMessageOperation.class);
+
+ private final AMQMessage _msg;
+
+ private final List<RequiredDeliveryException> _returns;
+
+ public CleanupMessageOperation(AMQMessage msg, List<RequiredDeliveryException> returns)
+ {
+ _msg = msg;
+ _returns = returns;
+ }
+
+ public void prepare(StoreContext context) throws AMQException
+ {
+ }
+
+ public void undoPrepare()
+ {
+ //don't need to do anything here, if the store's txn failed
+ //when processing prepare then the message was not stored
+ //or enqueued on any queues and can be discarded
+ }
+
+ public void commit(StoreContext context)
+ {
+ //The routers reference can now be released. This is done
+ //here to ensure that it happens after the queues that
+ //enqueue it have incremented their counts (which as a
+ //memory only operation is done in the commit phase).
+ try
+ {
+ _msg.decrementReference(context);
+ }
+ catch (AMQException e)
+ {
+ _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
+ }
+ try
+ {
+ _msg.checkDeliveredToConsumer();
+ }
+ catch (NoConsumersException e)
+ {
+ //TODO: store this for delivery after the commit-ok
+ _returns.add(e);
+ }
+ catch (AMQException e)
+ {
+ _log.error("On commiting transaction, unable to determine whether delivered to a consumer immediately: " +
+ e, e);
+ }
+ }
+
+ public void rollback(StoreContext context)
+ {
+ // NO OP
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
new file mode 100644
index 0000000000..3934943665
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class DeliverMessageOperation implements TxnOp
+{
+ private static final Logger _logger = Logger.getLogger(DeliverMessageOperation.class);
+
+ private final AMQMessage _msg;
+
+ private final AMQQueue _queue;
+
+ public DeliverMessageOperation(AMQMessage msg, AMQQueue queue)
+ {
+ _msg = msg;
+ _queue = queue;
+ _msg.incrementReference();
+ }
+
+ public void prepare(StoreContext context) throws AMQException
+ {
+ }
+
+ public void undoPrepare()
+ {
+ }
+
+ public void commit(StoreContext context)
+ {
+ //do the memeory part of the record()
+ _msg.incrementReference();
+ //then process the message
+ try
+ {
+ _queue.process(context, _msg);
+ }
+ catch (AMQException e)
+ {
+ //TODO: is there anything else we can do here? I think not...
+ _logger.error("Error during commit of a queue delivery: " + e, e);
+ }
+ }
+
+ public void rollback(StoreContext storeContext)
+ {
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
new file mode 100644
index 0000000000..ff6abdd58b
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.TxAck;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.List;
+
+/**
+ * A transactional context that only supports local transactions.
+ */
+public class LocalTransactionalContext implements TransactionalContext
+{
+ private final TxnBuffer _txnBuffer;
+
+ /**
+ * We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are
+ * consolidated into a single operation
+ */
+ private TxAck _ackOp;
+
+ private List<RequiredDeliveryException> _returnMessages;
+
+ private final MessageStore _messageStore;
+
+ private final StoreContext _storeContext;
+
+ private boolean _inTran = false;
+
+ public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext,
+ TxnBuffer txnBuffer, List<RequiredDeliveryException> returnMessages)
+ {
+ _messageStore = messageStore;
+ _storeContext = storeContext;
+ _txnBuffer = txnBuffer;
+ _returnMessages = returnMessages;
+ _txnBuffer.enlist(new StoreMessageOperation(messageStore));
+ }
+
+ public void rollback() throws AMQException
+ {
+ _txnBuffer.rollback(_storeContext);
+ }
+
+ public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ {
+ // A publication will result in the enlisting of several
+ // TxnOps. The first is an op that will store the message.
+ // Following that (and ordering is important), an op will
+ // be added for every queue onto which the message is
+ // enqueued. Finally a cleanup op will be added to decrement
+ // the reference associated with the routing.
+
+ _txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+ _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+ }
+
+ private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
+ {
+ if (!unacknowledgedMessageMap.contains(deliveryTag))
+ {
+ throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
+ }
+ }
+
+ public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
+ UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
+ {
+ //check that the tag exists to give early failure
+ if (!multiple || deliveryTag > 0)
+ {
+ checkAck(deliveryTag, unacknowledgedMessageMap);
+ }
+ //we use a single txn op for all acks and update this op
+ //as new acks come in. If this is the first ack in the txn
+ //we will need to create and enlist the op.
+ if (_ackOp == null)
+ {
+ _ackOp = new TxAck(unacknowledgedMessageMap);
+ _txnBuffer.enlist(_ackOp);
+ }
+ // update the op to include this ack request
+ if (multiple && deliveryTag == 0)
+ {
+ // if have signalled to ack all, that refers only
+ // to all at this time
+ _ackOp.update(lastDeliveryTag, multiple);
+ }
+ else
+ {
+ _ackOp.update(deliveryTag, multiple);
+ }
+ }
+
+ public void messageFullyReceived(boolean persistent) throws AMQException
+ {
+ // Not required in this transactional context
+ }
+
+ public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException
+ {
+ // Not required in this transactional context
+ }
+
+ public void beginTranIfNecessary() throws AMQException
+ {
+ if (!_inTran)
+ {
+ _messageStore.beginTran(_storeContext);
+ _inTran = true;
+ }
+ }
+
+ public void commit() throws AMQException
+ {
+ if (_ackOp != null)
+ {
+ _ackOp.consolidate();
+ //already enlisted, after commit will reset regardless of outcome
+ _ackOp = null;
+ }
+
+ _txnBuffer.commit(_storeContext);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
new file mode 100644
index 0000000000..f09d811b50
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -0,0 +1,208 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class NonTransactionalContext implements TransactionalContext
+{
+ private static final Logger _log = Logger.getLogger(NonTransactionalContext.class);
+
+ /**
+ * Channel is useful for logging
+ */
+ private final AMQChannel _channel;
+
+ /**
+ * Where to put undeliverable messages
+ */
+ private final List<RequiredDeliveryException> _returnMessages;
+
+ private Set<Long> _browsedAcks;
+
+ private final MessageStore _messageStore;
+
+ private StoreContext _storeContext;
+
+ /**
+ * Whether we are in a transaction
+ */
+ private boolean _inTran;
+
+ public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
+ List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks)
+ {
+ _channel = channel;
+ _storeContext = storeContext;
+ _returnMessages = returnMessages;
+ _messageStore = messageStore;
+ _browsedAcks = browsedAcks;
+ }
+
+ public void beginTranIfNecessary() throws AMQException
+ {
+ if (!_inTran)
+ {
+ _messageStore.beginTran(_storeContext);
+ _inTran = true;
+ }
+ }
+
+ public void commit() throws AMQException
+ {
+ // Does not apply to this context
+ }
+
+ public void rollback() throws AMQException
+ {
+ // Does not apply to this context
+ }
+
+ public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ {
+ try
+ {
+ message.incrementReference();
+ queue.process(_storeContext, message);
+ //following check implements the functionality
+ //required by the 'immediate' flag:
+ message.checkDeliveredToConsumer();
+ }
+ catch (NoConsumersException e)
+ {
+ _returnMessages.add(e);
+ }
+ }
+
+ public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag,
+ boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap)
+ throws AMQException
+ {
+ if (multiple)
+ {
+ if (deliveryTag == 0)
+ {
+
+ //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero,
+ // tells the server to acknowledge all outstanding mesages.
+ _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" +
+ unacknowledgedMessageMap.size());
+ unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + message.message.getMessageId());
+ }
+ message.discard(_storeContext);
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ unacknowledgedMessageMap.clear();
+ }
+ });
+ }
+ else
+ {
+ if (!unacknowledgedMessageMap.contains(deliveryTag))
+ {
+ throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+ }
+
+ LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
+ unacknowledgedMessageMap.drainTo(acked, deliveryTag);
+ for (UnacknowledgedMessage msg : acked)
+ {
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + msg.message.getMessageId());
+ }
+ msg.discard(_storeContext);
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
+ }
+ }
+ }
+ else
+ {
+ UnacknowledgedMessage msg;
+ msg = unacknowledgedMessageMap.remove(deliveryTag);
+
+ if (msg == null)
+ {
+ _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
+ _channel.getChannelId());
+ throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
+ _channel.getChannelId());
+ }
+ msg.discard(_storeContext);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
+ msg.message.getMessageId());
+ }
+ }
+ }
+
+ public void messageFullyReceived(boolean persistent) throws AMQException
+ {
+ if (persistent)
+ {
+ _messageStore.commitTran(_storeContext);
+ _inTran = false;
+ }
+ }
+
+ public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException
+ {
+ _channel.processReturns(protocolSession);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
new file mode 100644
index 0000000000..0e4d6c2030
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+/**
+ * A transactional operation to store messages in an underlying persistent store. When this operation
+ * commits it will do everything to ensure that all messages are safely committed to persistent
+ * storage.
+ */
+public class StoreMessageOperation implements TxnOp
+{
+ private final MessageStore _messsageStore;
+
+ public StoreMessageOperation(MessageStore messageStore)
+ {
+ _messsageStore = messageStore;
+ }
+
+ public void prepare(StoreContext context) throws AMQException
+ {
+ }
+
+ public void undoPrepare()
+ {
+ }
+
+ public void commit(StoreContext context) throws AMQException
+ {
+ _messsageStore.commitTran(context);
+ }
+
+ public void rollback(StoreContext context) throws AMQException
+ {
+ _messsageStore.abortTran(context);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
new file mode 100644
index 0000000000..bba4b98de4
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public interface TransactionalContext
+{
+ void beginTranIfNecessary() throws AMQException;
+
+ void commit() throws AMQException;
+
+ void rollback() throws AMQException;
+
+ void deliver(AMQMessage message, AMQQueue queue) throws AMQException;
+
+ void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
+ UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
+
+ void messageFullyReceived(boolean persistent) throws AMQException;
+
+ void messageProcessed(AMQProtocolSession protocolSession) throws AMQException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
index 402065d5e1..069caf0ae1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,91 +22,63 @@ package org.apache.qpid.server.txn;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import java.util.ArrayList;
import java.util.List;
/**
* Holds a list of TxnOp instance representing transactional
- * operations.
+ * operations.
*/
public class TxnBuffer
{
- private boolean _containsPersistentChanges = false;
- private final MessageStore _store;
private final List<TxnOp> _ops = new ArrayList<TxnOp>();
private static final Logger _log = Logger.getLogger(TxnBuffer.class);
- public TxnBuffer(MessageStore store)
- {
- _store = store;
- }
-
- public void containsPersistentChanges()
+ public TxnBuffer()
{
- _containsPersistentChanges = true;
}
- public void commit() throws AMQException
+ public void commit(StoreContext context) throws AMQException
{
- if (_containsPersistentChanges)
+ if (prepare(context))
{
- _log.debug("Begin Transaction.");
- _store.beginTran();
- if(prepare())
+ for (TxnOp op : _ops)
{
- _log.debug("Transaction Succeeded");
- _store.commitTran();
- for (TxnOp op : _ops)
- {
- op.commit();
- }
+ op.commit(context);
}
- else
- {
- _log.debug("Transaction Failed");
- _store.abortTran();
- }
- }else{
- if(prepare())
- {
- for (TxnOp op : _ops)
- {
- op.commit();
- }
- }
}
_ops.clear();
}
- private boolean prepare()
- {
+ private boolean prepare(StoreContext context)
+ {
for (int i = 0; i < _ops.size(); i++)
{
TxnOp op = _ops.get(i);
try
{
- op.prepare();
+ op.prepare(context);
}
- catch(Exception e)
+ catch (Exception e)
{
//compensate previously prepared ops
for(int j = 0; j < i; j++)
{
_ops.get(j).undoPrepare();
- }
+ }
return false;
}
}
return true;
- }
+ }
- public void rollback() throws AMQException
+ public void rollback(StoreContext context) throws AMQException
{
for (TxnOp op : _ops)
{
- op.rollback();
+ op.rollback(context);
}
_ops.clear();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java
index e863bab73e..919c078cf0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
/**
* This provides the abstraction of an individual operation within a
@@ -29,14 +30,14 @@ import org.apache.qpid.AMQException;
public interface TxnOp
{
/**
- * Do the part of the operation that updates persistent state
+ * Do the part of the operation that updates persistent state
*/
- public void prepare() throws AMQException;
+ public void prepare(StoreContext context) throws AMQException;
/**
* Complete the operation started by prepare. Can now update in
* memory state or make netork transfers.
*/
- public void commit();
+ public void commit(StoreContext context) throws AMQException;
/**
* This is not the same as rollback. Unfortunately the use of an
* in memory reference count as a locking mechanism and a test for
@@ -50,5 +51,5 @@ public interface TxnOp
/**
* Rolls back the operation.
*/
- public void rollback();
+ public void rollback(StoreContext context) throws AMQException;
}