summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-11-03 17:17:11 +0000
committerRobert Greig <rgreig@apache.org>2006-11-03 17:17:11 +0000
commit13c0fdd366018a57ccaabe0aa8a01f752ca27356 (patch)
tree63a3e754e00c057c71d91a339f177643b1649984
parent53da5dbacf15a717760552de283519babd7e4736 (diff)
downloadqpid-python-13c0fdd366018a57ccaabe0aa8a01f752ca27356.tar.gz
QPID-32 Commit of work in progress changes. Note that with these changes the tree no longer compiles.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@470908 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/org/apache/qpid/server/AMQChannel.java434
-rw-r--r--java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java70
-rw-r--r--java/broker/src/org/apache/qpid/server/ack/TxAck.java32
-rw-r--r--java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java39
-rw-r--r--java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java150
-rw-r--r--java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java5
-rw-r--r--java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java4
-rw-r--r--java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java4
-rw-r--r--java/broker/src/org/apache/qpid/server/exchange/Exchange.java2
-rw-r--r--java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java8
-rw-r--r--java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java3
-rw-r--r--java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java2
-rw-r--r--java/broker/src/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java74
-rw-r--r--java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java4
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQMessage.java371
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java53
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQQueue.java89
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java30
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java21
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/Subscription.java2
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java26
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java4
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java99
-rw-r--r--java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java84
-rw-r--r--java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java63
-rw-r--r--java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java118
-rw-r--r--java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java146
-rw-r--r--java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java47
-rw-r--r--java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java29
29 files changed, 1293 insertions, 720 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java
index 055af16159..748a4c121f 100644
--- a/java/broker/src/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java
@@ -19,11 +19,9 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
@@ -31,20 +29,16 @@ import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.LocalTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.TxnBuffer;
-import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.txn.NonTransactionalContext;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -56,7 +50,7 @@ public class AMQChannel
private final int _channelId;
- private boolean _transactional;
+ //private boolean _transactional;
private long _prefetchCount;
@@ -92,21 +86,18 @@ public class AMQChannel
private final MessageStore _messageStore;
- private final Object _unacknowledgedMessageMapLock = new Object();
-
- private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
-
- private long _lastDeliveryTag;
+ //private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
+ private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private final MessageRouter _exchanges;
- private final TxnBuffer _txnBuffer;
+ private TransactionalContext _txnContext;
- private TxAck ackOp;
+ private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
- private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
+ private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
@@ -115,22 +106,21 @@ public class AMQChannel
_prefetchCount = DEFAULT_PREFETCH;
_messageStore = messageStore;
_exchanges = exchanges;
- _txnBuffer = new TxnBuffer(_messageStore);
- }
-
- public int getChannelId()
- {
- return _channelId;
+ // TODO: fix me to pass in the txn context or at least have a factory for it
+ _txnContext = new NonTransactionalContext(this, _returnMessages);
}
- public boolean isTransactional()
+ /**
+ * Sets this channel to be part of a local transaction
+ */
+ public void setLocalTransactional()
{
- return _transactional;
+ _txnContext = new LocalTransactionalContext(new TxnBuffer(_messageStore), _returnMessages);
}
- public void setTransactional(boolean transactional)
+ public int getChannelId()
{
- _transactional = transactional;
+ return _channelId;
}
public long getPrefetchCount()
@@ -145,8 +135,9 @@ public class AMQChannel
public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
{
- _currentMessage = new AMQMessage(_messageStore, publishBody);
- _currentMessage.setPublisher(publisher);
+ _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), publishBody, _txnContext);
+ // TODO: used in clustering only I think (RG)
+ //_currentMessage.setPublisher(publisher);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
@@ -154,15 +145,18 @@ public class AMQChannel
{
if (_currentMessage == null)
{
- throw new AMQException("Received content header without previously receiving a BasicDeliver frame");
+ throw new AMQException("Received content header without previously receiving a BasicPublish frame");
}
else
{
_currentMessage.setContentHeaderBody(contentHeaderBody);
- // check and route if header says body length is zero
+ routeCurrentMessage();
+ _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+
+ // check and deliver if header says body length is zero
if (contentHeaderBody.bodySize == 0)
{
- routeCurrentMessage();
+ _currentMessage = null;
}
}
}
@@ -174,24 +168,36 @@ public class AMQChannel
{
throw new AMQException("Received content body without previously receiving a JmsPublishBody");
}
- if (_currentMessage.getContentHeaderBody() == null)
+
+ // returns true iff the message was delivered (i.e. if all data was
+ // received
+ try
{
- throw new AMQException("Received content body without previously receiving a content header");
+ if (_currentMessage.addContentBodyFrame(contentBody))
+ {
+ _currentMessage = null;
+ }
}
-
- _currentMessage.addContentBodyFrame(contentBody);
- if (_currentMessage.isAllContentReceived())
+ catch (AMQException e)
{
- routeCurrentMessage();
+ // we want to make sure we don't keep a reference to the message in the
+ // event of an error
+ _currentMessage = null;
+ throw e;
}
}
protected void routeCurrentMessage() throws AMQException
{
+ _exchanges.routeContent(_currentMessage);
+ }
+
+ /*protected void routeCurrentMessage2() throws AMQException
+ {
if (_transactional)
{
- //don't create a transaction unless needed
- if(_currentMessage.isPersistent())
+ // don't create a transaction unless needed
+ if (_currentMessage.isPersistent())
{
_txnBuffer.containsPersistentChanges();
}
@@ -210,7 +216,7 @@ public class AMQChannel
_exchanges.routeContent(_currentMessage);
_txnBuffer.enlist(new Cleanup(_currentMessage));
}
- catch(RequiredDeliveryException e)
+ catch (RequiredDeliveryException e)
{
//Can only be due to the mandatory flag, as no attempt
//has yet been made to deliver the message. The
@@ -236,12 +242,12 @@ public class AMQChannel
_currentMessage.checkDeliveredToConsumer();
}
finally
- {
+ {
_currentMessage.decrementReference();
_currentMessage = null;
}
}
- }
+ } */
public long getNextDeliveryTag()
{
@@ -266,7 +272,8 @@ public class AMQChannel
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException
+ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks)
+ throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -304,13 +311,14 @@ public class AMQChannel
*/
public void close(AMQProtocolSession session) throws AMQException
{
- if (_transactional)
+ /*if (_transactional)
{
- synchronized(_txnBuffer)
+ synchronized (_txnBuffer)
{
_txnBuffer.rollback();//releases messages
}
- }
+ } */
+ _txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
}
@@ -328,39 +336,32 @@ public class AMQChannel
/**
* Add a message to the channel-based list of unacknowledged messages
*
- * @param message
- * @param deliveryTag
- * @param queue
+ * @param message the message that was delivered
+ * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of
+ * the delivery tag)
+ * @param queue the queue from which the message was delivered
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
{
- synchronized(_unacknowledgedMessageMapLock)
- {
- _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
- _lastDeliveryTag = deliveryTag;
- checkSuspension();
- }
+ _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+ checkSuspension();
}
/**
* Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
* May result in delivery to this same channel or to other subscribers.
+ * @throws org.apache.qpid.AMQException if the requeue fails
*/
public void requeue() throws AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
- Map<Long, UnacknowledgedMessage> currentList;
- synchronized(_unacknowledgedMessageMapLock)
- {
- currentList = _unacknowledgedMessageMap;
- _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
- }
+ Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
- for (UnacknowledgedMessage unacked : currentList.values())
+ for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
if (unacked.queue != null)
{
- unacked.queue.deliver(unacked.message);
+ _txnContext.deliver(unacked.message, unacked.queue);
}
}
}
@@ -368,53 +369,62 @@ public class AMQChannel
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*/
- public void resend(AMQProtocolSession session)
+ public void resend(final AMQProtocolSession session) throws AMQException
{
- //messages go to this channel
- synchronized(_unacknowledgedMessageMapLock)
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
+ public boolean callback(UnacknowledgedMessage message)
{
- long deliveryTag = entry.getKey();
- String consumerTag = entry.getValue().consumerTag;
- AMQMessage msg = entry.getValue().message;
+ long deliveryTag = message.deliveryTag;
+ String consumerTag = message.consumerTag;
+ AMQMessage msg = message.message;
- session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+ msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ // false means continue processing
+ return false;
}
- }
+
+ public void visitComplete()
+ {
+ }
+ });
}
/**
* Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
* messages to remove the queue reference and also decrement any message reference counts, without
- * actually removing the item sine we may get an ack for a delivery tag that was generated from the
+ * actually removing the item since we may get an ack for a delivery tag that was generated from the
* deleted queue.
*
- * @param queue
+ * @param queue the queue that has been deleted
+ * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(final AMQQueue queue) throws AMQException
{
- synchronized(_unacknowledgedMessageMapLock)
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet())
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- final UnacknowledgedMessage unackedMsg = unacked.getValue();
- // we can compare the reference safely in this case
- if (unackedMsg.queue == queue)
+ if (message.queue == queue)
{
- unackedMsg.queue = null;
+ message.queue = null;
try
{
- unackedMsg.message.decrementReference();
+ message.message.decrementReference();
}
catch (AMQException e)
{
- _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " +
+ _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " +
e, e);
}
}
+ return false;
}
- }
+
+ public void visitComplete()
+ {
+ }
+ });
}
/**
@@ -427,20 +437,22 @@ public class AMQChannel
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- if (_transactional)
+ _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
+ checkSuspension();
+ /*if (_transactional)
{
//check that the tag exists to give early failure
- if(!multiple || deliveryTag > 0)
+ if (!multiple || deliveryTag > 0)
{
checkAck(deliveryTag);
}
//we use a single txn op for all acks and update this op
//as new acks come in. If this is the first ack in the txn
//we will need to create and enlist the op.
- if(ackOp == null)
+ if (_ackOp == null)
{
- ackOp = new TxAck(new AckMap());
- _txnBuffer.enlist(ackOp);
+ _ackOp = new TxAck(new AckMap());
+ _txnBuffer.enlist(_ackOp);
}
//update the op to include this ack request
if(multiple && deliveryTag == 0)
@@ -449,108 +461,18 @@ public class AMQChannel
{
//if have signalled to ack all, that refers only
//to all at this time
- ackOp.update(_lastDeliveryTag, multiple);
+ _ackOp.update(_lastDeliveryTag, multiple);
}
}
else
{
- ackOp.update(deliveryTag, multiple);
+ _ackOp.update(deliveryTag, multiple);
}
}
else
{
handleAcknowledgement(deliveryTag, multiple);
- }
- }
-
- private void checkAck(long deliveryTag) throws AMQException
- {
- synchronized(_unacknowledgedMessageMapLock)
- {
- if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
- {
- throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
- }
- }
- }
-
- private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
- {
- if (multiple)
- {
- LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
- synchronized(_unacknowledgedMessageMapLock)
- {
- if (deliveryTag == 0)
- {
- //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages.
- _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size());
- acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values());
- _unacknowledgedMessageMap.clear();
- }
- else
- {
- if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
- {
- throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
- }
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator();
-
- while (i.hasNext())
- {
-
- Map.Entry<Long, UnacknowledgedMessage> unacked = i.next();
-
- if (unacked.getKey() > deliveryTag)
- {
- //This should not occur now.
- throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString());
- }
-
- i.remove();
-
- acked.add(unacked.getValue());
- if (unacked.getKey() == deliveryTag)
- {
- break;
- }
- }
- }
- }// synchronized
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
- acked.size() + " items.");
- }
-
- for (UnacknowledgedMessage msg : acked)
- {
- msg.discard();
- }
-
- }
- else
- {
- UnacknowledgedMessage msg;
- synchronized(_unacknowledgedMessageMapLock)
- {
- msg = _unacknowledgedMessageMap.remove(deliveryTag);
- }
-
- if (msg == null)
- {
- _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
- throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
- }
- msg.discard();
- if (_log.isDebugEnabled())
- {
- _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
- }
- }
-
- checkSuspension();
+ } */
}
/**
@@ -558,19 +480,14 @@ public class AMQChannel
*
* @return the map of unacknowledged messages
*/
- public Map<Long, UnacknowledgedMessage> getUnacknowledgedMessageMap()
+ public UnacknowledgedMessageMap getUnacknowledgedMessageMap()
{
return _unacknowledgedMessageMap;
}
private void checkSuspension()
{
- boolean suspend;
- //noinspection SynchronizeOnNonFinalField
- synchronized(_unacknowledgedMessageMapLock)
- {
- suspend = _unacknowledgedMessageMap.size() >= _prefetchCount;
- }
+ boolean suspend = _unacknowledgedMessageMap.size() >= _prefetchCount;
setSuspended(suspend);
}
@@ -602,33 +519,18 @@ public class AMQChannel
public void commit() throws AMQException
{
- if(ackOp != null)
- {
- ackOp.consolidate();
- if(ackOp.checkPersistent())
- {
- _txnBuffer.containsPersistentChanges();
- }
- ackOp = null;//already enlisted, after commit will reset regardless of outcome
- }
-
- _txnBuffer.commit();
- //TODO: may need to return 'immediate' messages at this point
+ _txnContext.commit();
}
public void rollback() throws AMQException
{
- //need to protect rollback and close from each other...
- synchronized(_txnBuffer)
- {
- _txnBuffer.rollback();
- }
+ _txnContext.rollback();
}
public String toString()
{
StringBuilder sb = new StringBuilder(30);
- sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional);
+ sb.append("Channel: id ").append(_channelId).append(", transaction context: ").append(_txnContext);
sb.append(", prefetch count: ").append(_prefetchCount);
return sb.toString();
}
@@ -638,7 +540,6 @@ public class AMQChannel
{
StringBuilder sb = new StringBuilder(30);
sb.append("Channel:id=").append(_channelId);
- sb.append(",transaction mode=").append(_transactional);
return new ObjectName(sb.toString());
}
@@ -654,112 +555,11 @@ public class AMQChannel
public void processReturns(AMQProtocolSession session)
{
- for(AMQDataBlock block : _returns)
- {
- session.writeFrame(block);
- }
- _returns.clear();
- }
-
- //we use this wrapper to ensure we are always using the correct
- //map instance (its not final unfortunately)
- private class AckMap implements UnacknowledgedMessageMap
- {
- public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
- {
- impl().collect(deliveryTag, multiple, msgs);
- }
- public void remove(List<UnacknowledgedMessage> msgs)
- {
- impl().remove(msgs);
- }
-
- private UnacknowledgedMessageMap impl()
- {
- return new UnacknowledgedMessageMapImpl(_unacknowledgedMessageMapLock, _unacknowledgedMessageMap);
- }
- }
-
- private class Store implements TxnOp
- {
- //just use this to do a store of the message during the
- //prepare phase. Any enqueueing etc is done by TxnOps enlisted
- //by the queues themselves.
- private final AMQMessage _msg;
-
- Store(AMQMessage msg)
- {
- _msg = msg;
- }
-
- public void prepare() throws AMQException
- {
- _msg.storeMessage();
- //the routers reference can now be released
- _msg.decrementReference();
- }
-
- public void undoPrepare()
- {
- }
-
- public void commit()
- {
- }
-
- public void rollback()
- {
- }
- }
-
- private class Cleanup implements TxnOp
- {
- private final AMQMessage _msg;
-
- Cleanup(AMQMessage msg)
- {
- _msg = msg;
- }
-
- public void prepare() throws AMQException
- {
- }
-
- public void undoPrepare()
- {
- //don't need to do anything here, if the store's txn failed
- //when processing prepare then the message was not stored
- //or enqueued on any queues and can be discarded
- }
-
- public void commit()
- {
- //The routers reference can now be released. This is done
- //here to ensure that it happens after the queues that
- //enqueue it have incremented their counts (which as a
- //memory only operation is done in the commit phase).
- try
- {
- _msg.decrementReference();
- }
- catch(AMQException e)
- {
- _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
- }
- try
- {
- _msg.checkDeliveredToConsumer();
- }
- catch(NoConsumersException e)
- {
- //TODO: store this for delivery after the commit-ok
- _returns.add(e.getReturnMessage(_channelId));
- }
- }
-
- public void rollback()
+ for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
+ AMQMessage message = bouncedMessage.getAMQMessage();
+ message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), bouncedMessage.getMessage());
}
+ _returnMessages.clear();
}
-
}
diff --git a/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java
index 6c4fb6b730..d2b017275a 100644
--- a/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java
@@ -17,17 +17,9 @@
*/
package org.apache.qpid.server;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
-import java.util.List;
-
/**
* Signals that a required delivery could not be made. This could be bacuse of
* the immediate flag being set and the queue having no consumers, or the mandatory
@@ -35,75 +27,23 @@ import java.util.List;
*/
public abstract class RequiredDeliveryException extends AMQException
{
- private final String _message;
- private final BasicPublishBody _publishBody;
- private final ContentHeaderBody _contentHeaderBody;
- private final List<ContentBody> _contentBodies;
+ private final AMQMessage _amqMessage;
public RequiredDeliveryException(String message, AMQMessage payload)
{
super(message);
- _message = message;
- _publishBody = payload.getPublishBody();
- _contentHeaderBody = payload.getContentHeaderBody();
- _contentBodies = payload.getContentBodies();
- }
-
- public RequiredDeliveryException(String message,
- BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
- {
- super(message);
- _message = message;
- _publishBody = publishBody;
- _contentHeaderBody = contentHeaderBody;
- _contentBodies = contentBodies;
+ _amqMessage = payload;
}
- public BasicPublishBody getPublishBody()
+ public AMQMessage getAMQMessage()
{
- return _publishBody;
- }
-
- public ContentHeaderBody getContentHeaderBody()
- {
- return _contentHeaderBody;
- }
-
- public List<ContentBody> getContentBodies()
- {
- return _contentBodies;
- }
-
- public CompositeAMQDataBlock getReturnMessage(int channel)
- {
- BasicReturnBody returnBody = new BasicReturnBody();
- returnBody.exchange = _publishBody.exchange;
- returnBody.replyCode = getReplyCode();
- returnBody.replyText = _message;
- returnBody.routingKey = _publishBody.routingKey;
-
- AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
-
- AMQFrame returnFrame = new AMQFrame();
- returnFrame.bodyFrame = returnBody;
- returnFrame.channel = channel;
-
- allFrames[0] = returnFrame;
- allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
- for (int i = 2; i < allFrames.length; i++)
- {
- allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2));
- }
-
- return new CompositeAMQDataBlock(allFrames);
+ return _amqMessage;
}
public int getErrorCode()
{
return getReplyCode();
- }
+ }
public abstract int getReplyCode();
}
diff --git a/java/broker/src/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/org/apache/qpid/server/ack/TxAck.java
index 0d502a8f6e..85072b6976 100644
--- a/java/broker/src/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/org/apache/qpid/server/ack/TxAck.java
@@ -17,19 +17,15 @@
*/
package org.apache.qpid.server.ack;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.TxnOp;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
* A TxnOp implementation for handling accumulated acks
- */
+ */
public class TxAck implements TxnOp
{
private final UnacknowledgedMessageMap _map;
@@ -45,14 +41,14 @@ public class TxAck implements TxnOp
public void update(long deliveryTag, boolean multiple)
{
- if(!multiple)
+ if (!multiple)
{
//have acked a single message that is not part of
//the previously acked region so record
//individually
_individual.add(deliveryTag);//_multiple && !multiple
}
- else if(deliveryTag > _deliveryTag)
+ else if (deliveryTag > _deliveryTag)
{
//have simply moved the last acked message on a
//bit
@@ -64,7 +60,7 @@ public class TxAck implements TxnOp
public void consolidate()
{
//lookup all the unacked messages that have been acked in this transaction
- if(_multiple)
+ if (_multiple)
{
//get all the unacked messages for the accumulated
//multiple acks
@@ -72,22 +68,22 @@ public class TxAck implements TxnOp
}
//get any unacked messages for individual acks outside the
//range covered by multiple acks
- for(long tag : _individual)
+ for (long tag : _individual)
{
if(_deliveryTag < tag)
{
- _map.collect(tag, false, _unacked);
+ _map.collect(tag, false, _unacked);
}
}
}
public boolean checkPersistent() throws AMQException
- {
+ {
//if any of the messages in unacked are persistent the txn
//buffer must be marked as persistent:
- for(UnacknowledgedMessage msg : _unacked)
+ for (UnacknowledgedMessage msg : _unacked)
{
- if(msg.message.isPersistent())
+ if (msg.message.isPersistent())
{
return true;
}
@@ -98,22 +94,22 @@ public class TxAck implements TxnOp
public void prepare() throws AMQException
{
//make persistent changes, i.e. dequeue and decrementReference
- for(UnacknowledgedMessage msg : _unacked)
+ for (UnacknowledgedMessage msg : _unacked)
{
msg.discard();
}
}
-
+
public void undoPrepare()
{
//decrementReference is annoyingly untransactional (due to
//in memory counter) so if we failed in prepare for full
//txn, this op will have to compensate by fixing the count
- //in memory (persistent changes will be rolled back by store)
- for(UnacknowledgedMessage msg : _unacked)
+ //in memory (persistent changes will be rolled back by store)
+ for (UnacknowledgedMessage msg : _unacked)
{
msg.message.incrementReference();
- }
+ }
}
public void commit()
diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
index ccb55f7e2d..8fece7bcc1 100644
--- a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
+++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
@@ -17,11 +17,46 @@
*/
package org.apache.qpid.server.ack;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.AMQException;
+
+import java.util.Collection;
import java.util.List;
public interface UnacknowledgedMessageMap
{
- public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
- public void remove(List<UnacknowledgedMessage> msgs);
+ public interface Visitor
+ {
+ /**
+ * @param message the message being iterated over
+ * @return true to stop iteration, false to continue
+ * @throws AMQException
+ */
+ boolean callback(UnacknowledgedMessage message) throws AMQException;
+
+ void visitComplete();
+ }
+
+ void visit(Visitor visitor) throws AMQException;
+
+ void add(long deliveryTag, UnacknowledgedMessage message);
+
+ void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
+
+ boolean contains(long deliveryTag) throws AMQException;
+
+ void remove(List<UnacknowledgedMessage> msgs);
+
+ UnacknowledgedMessage remove(long deliveryTag);
+
+ void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException;
+
+ Collection<UnacknowledgedMessage> cancelAllMessages();
+
+ void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException;
+
+ int size();
+
+ void clear();
}
diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 117793930f..906ef9463c 100644
--- a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -17,19 +17,34 @@
*/
package org.apache.qpid.server.ack;
-import java.util.List;
-import java.util.Map;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.AMQException;
+
+import java.util.*;
public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
- private final Object _lock;
+ private final Object _lock = new Object();
+
private Map<Long, UnacknowledgedMessage> _map;
- public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map)
+ private long _lastDeliveryTag;
+
+ private final int _prefetchLimit;
+
+ public UnacknowledgedMessageMapImpl(int prefetchLimit)
+ {
+ _prefetchLimit = prefetchLimit;
+ _map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit);
+ }
+
+ /*public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map)
{
_lock = lock;
_map = map;
- }
+ } */
public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
{
@@ -44,20 +59,133 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
+ public boolean contains(long deliveryTag) throws AMQException
+ {
+ synchronized (_lock)
+ {
+ return _map.containsKey(deliveryTag);
+ }
+ }
+
public void remove(List<UnacknowledgedMessage> msgs)
{
- synchronized(_lock)
+ synchronized (_lock)
{
for(UnacknowledgedMessage msg : msgs)
{
_map.remove(msg.deliveryTag);
- }
+ }
+ }
+ }
+
+ public UnacknowledgedMessage remove(long deliveryTag)
+ {
+ synchronized (_lock)
+ {
+ return _map.remove(deliveryTag);
}
}
+ public void visit(Visitor visitor) throws AMQException
+ {
+ synchronized (_lock)
+ {
+ Collection<UnacknowledgedMessage> currentEntries = _map.values();
+ for (UnacknowledgedMessage msg: currentEntries)
+ {
+ visitor.callback(msg);
+ }
+ }
+ }
+
+ public void add(long deliveryTag, UnacknowledgedMessage message)
+ {
+ synchronized( _lock)
+ {
+ _map.put(deliveryTag, message);
+ _lastDeliveryTag = deliveryTag;
+ }
+ }
+
+ public Collection<UnacknowledgedMessage> cancelAllMessages()
+ {
+ synchronized (_lock)
+ {
+ Collection<UnacknowledgedMessage> currentEntries = _map.values();
+ _map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit);
+ return currentEntries;
+ }
+ }
+
+ public void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext)
+ throws AMQException
+ {
+ synchronized (_lock)
+ {
+ txnContext.acknowledgeMessage(deliveryTag, _lastDeliveryTag, multiple, this);
+ }
+ }
+
+ public int size()
+ {
+ synchronized (_lock)
+ {
+ return _map.size();
+ }
+ }
+
+ public void clear()
+ {
+ synchronized (_lock)
+ {
+ _map.clear();
+ }
+ }
+
+ public void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException
+ {
+ synchronized (_lock)
+ {
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = _map.entrySet().iterator();
+ while (it.hasNext())
+ {
+ Map.Entry<Long, UnacknowledgedMessage> unacked = it.next();
+
+ if (unacked.getKey() > deliveryTag)
+ {
+ //This should not occur now.
+ throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
+ " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
+ }
+
+ it.remove();
+
+ destination.add(unacked.getValue());
+ if (unacked.getKey() == deliveryTag)
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ public void resendMessages(AMQProtocolSession protocolSession, int channelId)
+ {
+ synchronized (_lock)
+ {
+ for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+ {
+ long deliveryTag = entry.getKey();
+ String consumerTag = entry.getValue().consumerTag;
+ AMQMessage msg = entry.getValue().message;
+
+ msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag);
+ }
+ }
+ }
private UnacknowledgedMessage get(long key)
{
- synchronized(_lock)
+ synchronized (_lock)
{
return _map.get(key);
}
@@ -65,7 +193,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
private void collect(long key, List<UnacknowledgedMessage> msgs)
{
- synchronized(_lock)
+ synchronized (_lock)
{
for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
{
@@ -73,9 +201,11 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
if (entry.getKey() == key)
{
break;
- }
+ }
}
}
}
+
+
}
diff --git a/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 06a3944367..62b0415253 100644
--- a/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -17,10 +17,10 @@
*/
package org.apache.qpid.server.exchange;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.log4j.Logger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -81,8 +81,9 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
final String exchange = payload.getPublishBody().exchange;
final Exchange exch = _exchangeMap.get(exchange);
// there is a small window of opportunity for the exchange to be deleted in between
- // the JmsPublish being received (where the exchange is validated) and the final
+ // the BasicPublish being received (where the exchange is validated) and the final
// content body being received (which triggers this method)
+ // TODO: check where the exchange is validated
if (exch == null)
{
throw new AMQException("Exchange '" + exchange + "' does not exist");
diff --git a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
index f89223825f..e6cc683b15 100644
--- a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -212,8 +212,8 @@ public class DestNameExchange extends AbstractExchange
}
for (AMQQueue q : queues)
- {
- q.deliver(payload);
+ {
+ payload.registerQueue(q);
}
}
}
diff --git a/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
index 25d91027e9..6e99eb5c21 100644
--- a/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -183,8 +183,8 @@ public class DestWildExchange extends AbstractExchange
{
// TODO: modify code generator to add clone() method then clone the deliver body
// without this addition we have a race condition - we will be modifying the body
- // before the encoder has encoded the body for delivery
- q.deliver(payload);
+ // before the encoder has encoded the body for delivery
+ payload.registerQueue(q);
}
}
diff --git a/java/broker/src/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/org/apache/qpid/server/exchange/Exchange.java
index cd75825601..02a0badb65 100644
--- a/java/broker/src/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/org/apache/qpid/server/exchange/Exchange.java
@@ -19,8 +19,8 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
public interface Exchange
{
diff --git a/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
index 1167d05875..3b19c11ae4 100644
--- a/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -175,7 +175,7 @@ public class HeadersExchange extends AbstractExchange
{
_logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
}
- boolean delivered = false;
+ boolean routed = false;
for (Registration e : _bindings)
{
if (e.binding.matches(headers))
@@ -185,11 +185,11 @@ public class HeadersExchange extends AbstractExchange
_logger.debug("Exchange " + getName() + ": delivering message with headers " +
headers + " to " + e.queue.getName());
}
- e.queue.deliver(payload);
- delivered = true;
+ payload.registerQueue(e.queue);
+ routed = true;
}
}
- if (!delivered)
+ if (!routed)
{
_logger.warn("Exchange " + getName() + ": message not routable.");
}
diff --git a/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java b/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java
index 2eef5f0676..832a60c681 100644
--- a/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java
+++ b/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java
@@ -17,8 +17,8 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
/**
* Separated out from the ExchangeRegistry interface to allow components
@@ -30,6 +30,7 @@ public interface MessageRouter
/**
* Routes content through exchanges, delivering it to 1 or more queues.
* @param message the message to be routed
+ *
* @throws org.apache.qpid.AMQException if something goes wrong delivering data
*/
void routeContent(AMQMessage message) throws AMQException;
diff --git a/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java
index d55930489c..cf665950ca 100644
--- a/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java
+++ b/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java
@@ -44,7 +44,7 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
- protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
+ protocolSession.getChannel(evt.getChannelId()).setLocalTransactional();
protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId()));
}
}
diff --git a/java/broker/src/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 35bc7fb4c7..9ede6157f4 100644
--- a/java/broker/src/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -25,44 +25,17 @@ import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.management.*;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
-import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.MalformedObjectNameException;
-import javax.management.NotCompliantMBeanException;
-import javax.management.Notification;
-import javax.management.ObjectName;
+import javax.management.*;
import javax.management.monitor.MonitorNotification;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
+import javax.management.openmbean.*;
import javax.security.sasl.SaslServer;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -104,9 +77,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private boolean _closed;
private long _maxNoOfChannels;
-
+
/* AMQP Version for this session */
-
+
private byte _major;
private byte _minor;
@@ -240,10 +213,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
- if (channel.isTransactional())
- {
- channel.commit();
- }
+ channel.commit();
}
catch(AMQException ex)
{
@@ -260,10 +230,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
- if (channel.isTransactional())
- {
- channel.rollback();
- }
+ channel.rollback();
}
catch(AMQException ex)
{
@@ -297,7 +264,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
Object[] itemValues = {channel.getChannelId(),
channelObjectName,
- channel.isTransactional(),
(channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
channel.getUnacknowledgedMessageMap().size()};
@@ -448,17 +414,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
else
{
- try
- {
- contentFrameReceived(frame);
- }
- catch (RequiredDeliveryException e)
- {
- //need to return the message:
- _logger.info("Returning message to " + this + " channel " + frame.channel
- + ": " + e.getMessage());
- writeFrame(e.getReturnMessage(frame.channel));
- }
+ contentFrameReceived(frame);
}
}
}
@@ -539,9 +495,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* Convenience method that writes a frame to the protocol session. Equivalent
- * to calling getProtocolSession().write().
+ * to calling getProtocolSession().writeDeliver().
*
- * @param frame the frame to write
+ * @param frame the frame to writeDeliver
*/
public void writeFrame(AMQDataBlock frame)
{
@@ -702,22 +658,22 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_saslServer = saslServer;
}
-
+
/**
* Convenience methods for managing AMQP version.
* NOTE: Both major and minor will be set to 0 prior to protocol initiation.
*/
-
+
public byte getAmqpMajor()
{
return _major;
}
-
+
public byte getAmqpMinor()
{
return _minor;
}
-
+
public boolean amqpVersionEquals(byte major, byte minor)
{
return _major == major && _minor == minor;
diff --git a/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 4478a0cf32..78fb76ae27 100644
--- a/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -139,7 +139,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
_logger.debug("Protocol Session [" + this + "] idle: " + status);
if(IdleStatus.WRITER_IDLE.equals(status))
{
- //write heartbeat frame:
+ //writeDeliver heartbeat frame:
session.write(HeartbeatBody.FRAME);
}
else if(IdleStatus.READER_IDLE.equals(status))
@@ -165,7 +165,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
}
else if(throwable instanceof IOException)
{
- _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
+ _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
}
else
{
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
index 7666b5b3f8..825d4feae4 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
@@ -18,17 +18,13 @@
package org.apache.qpid.server.queue;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.txn.TxnBuffer;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.txn.TransactionalContext;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.HashSet;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -36,43 +32,35 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class AMQMessage
{
+ /**
+ * Used in clustering
+ */
private final Set<Object> _tokens = new HashSet<Object>();
+ /**
+ * Used in clustering
+ * TODO need to get rid of this
+ */
private AMQProtocolSession _publisher;
- private final BasicPublishBody _publishBody;
-
- private ContentHeaderBody _contentHeaderBody;
-
- private List<ContentBody> _contentBodies;
-
- private boolean _redelivered;
-
private final long _messageId;
private final AtomicInteger _referenceCount = new AtomicInteger(1);
- /**
- * Keeps a track of how many bytes we have received in body frames
- */
- private long _bodyLengthReceived = 0;
+ private AMQMessageHandle _messageHandle;
/**
- * The message store in which this message is contained.
+ * Stored temporarily until the header has been received at which point it is used when
+ * constructing the handle
*/
- private transient final MessageStore _store;
+ private BasicPublishBody _publishBody;
/**
- * For non transactional publishes, a message can be stored as
- * soon as it is complete. For transactional messages it doesnt
- * need to be stored until the transaction is committed.
+ * Keeps a track of how many bytes we have received in body frames
*/
- private boolean _storeWhenComplete;
+ private long _bodyLengthReceived = 0;
- /**
- * TxnBuffer for transactionally published messages
- */
- private TxnBuffer _txnBuffer;
+ private final TransactionalContext _txnContext;
/**
* Flag to indicate whether message has been delivered to a
@@ -81,55 +69,97 @@ public class AMQMessage
*/
private boolean _deliveredToConsumer;
+ private ContentHeaderBody _contentHeaderBody;
- public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
- {
- this(messageStore, publishBody, true);
- }
+ private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
- public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody, boolean storeWhenComplete)
+ /**
+ * Used to iterate through all the body frames associated with this message. Will not
+ * keep all the data in memory therefore is memory-efficient.
+ */
+ private class BodyFrameIterator implements Iterator<AMQDataBlock>
{
- _messageId = messageStore.getNewMessageId();
- _publishBody = publishBody;
- _store = messageStore;
- _contentBodies = new LinkedList<ContentBody>();
- _storeWhenComplete = storeWhenComplete;
+ private int _channel;
+
+ private int _index = -1;
+
+ private BodyFrameIterator(int channel)
+ {
+ _channel = channel;
+ }
+
+ public boolean hasNext()
+ {
+ return _index < _messageHandle.getBodyCount();
+ }
+
+ public AMQDataBlock next()
+ {
+ return ContentBody.createAMQFrame(_channel, _messageHandle.getContentBody(++_index));
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
}
- public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
- throws AMQException
-
+ private class BodyContentIterator implements Iterator<ContentBody>
{
- _publishBody = publishBody;
- _contentHeaderBody = contentHeaderBody;
- _contentBodies = contentBodies;
- _messageId = messageId;
- _store = store;
- storeMessage();
+
+ private int _index = -1;
+
+ public boolean hasNext()
+ {
+ return _index < _messageHandle.getBodyCount();
+ }
+
+ public ContentBody next()
+ {
+ return _messageHandle.getContentBody(++_index);
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
}
- public AMQMessage(MessageStore store, BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
- throws AMQException
+ public AMQMessage(long messageId, BasicPublishBody publishBody, TransactionalContext txnContext)
{
- this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
+ _messageId = messageId;
+ _txnContext = txnContext;
+ _publishBody = publishBody;
}
protected AMQMessage(AMQMessage msg) throws AMQException
{
- this(msg._store, msg._messageId, msg._publishBody, msg._contentHeaderBody, msg._contentBodies);
+ _publisher = msg._publisher;
+ _messageId = msg._messageId;
+ _messageHandle = msg._messageHandle;
+ _txnContext = msg._txnContext;
+ _deliveredToConsumer = msg._deliveredToConsumer;
}
public void storeMessage() throws AMQException
{
- if (isPersistent())
+ /*if (isPersistent())
{
_store.put(this);
- }
+ } */
+ }
+
+ public Iterator<AMQDataBlock> getBodyFrameIterator(int channel)
+ {
+ return new BodyFrameIterator(channel);
+ }
+
+ public Iterator<ContentBody> getContentBodyIterator()
+ {
+ return new BodyContentIterator();
}
- public CompositeAMQDataBlock getDataBlock(ByteBuffer encodedDeliverBody, int channel)
+ /*public CompositeAMQDataBlock getDataBlock(ByteBuffer encodedDeliverBody, int channel)
{
AMQFrame[] allFrames = new AMQFrame[1 + _contentBodies.size()];
@@ -167,80 +197,76 @@ public class AMQMessage
public BasicPublishBody getPublishBody()
{
return _publishBody;
- }
+ } */
public ContentHeaderBody getContentHeaderBody()
{
- return _contentHeaderBody;
+ return _messageHandle.getContentHeaderBody();
}
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+ throws AMQException
{
_contentHeaderBody = contentHeaderBody;
- if (_storeWhenComplete && isAllContentReceived())
- {
- storeMessage();
- }
}
- public List<ContentBody> getContentBodies()
+ public void routingComplete(MessageStore store, MessageHandleFactory factory) throws AMQException
{
- return _contentBodies;
- }
-
- public void setContentBodies(List<ContentBody> contentBodies)
- {
- _contentBodies = contentBodies;
+ _messageHandle = factory.createMessageHandle(store, _publishBody, _contentHeaderBody);
+ if (_contentHeaderBody.bodySize == 0)
+ {
+ deliver();
+ }
}
- public void addContentBodyFrame(ContentBody contentBody) throws AMQException
+ public boolean addContentBodyFrame(ContentBody contentBody) throws AMQException
{
- _contentBodies.add(contentBody);
_bodyLengthReceived += contentBody.getSize();
- if (_storeWhenComplete && isAllContentReceived())
+ _messageHandle.addContentBodyFrame(contentBody);
+ if (isAllContentReceived())
{
- storeMessage();
+ deliver();
+ return true;
+ }
+ else
+ {
+ return false;
}
}
public boolean isAllContentReceived()
{
- return _bodyLengthReceived == _contentHeaderBody.bodySize;
+ return _bodyLengthReceived == _messageHandle.getBodySize();
}
public boolean isRedelivered()
{
- return _redelivered;
+ return _messageHandle.isRedelivered();
}
String getExchangeName()
{
- return _publishBody.exchange;
+ return _messageHandle.getExchangeName();
}
String getRoutingKey()
{
- return _publishBody.routingKey;
+ return _messageHandle.getRoutingKey();
}
boolean isImmediate()
{
- return _publishBody.immediate;
- }
-
- NoConsumersException getNoConsumersException(String queue)
- {
- return new NoConsumersException(queue, _publishBody, _contentHeaderBody, _contentBodies);
+ return _messageHandle.isImmediate();
}
- void setRedelivered(boolean redelivered)
+ public long getMessageId()
{
- _redelivered = redelivered;
+ return _messageId;
}
- public long getMessageId()
+ public BasicPublishBody getPublishBody()
{
- return _messageId;
+ return _publishBody;
}
/**
@@ -254,6 +280,9 @@ public class AMQMessage
/**
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
+ *
+ * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
*/
public void decrementReference() throws MessageCleanupException
{
@@ -261,19 +290,19 @@ public class AMQMessage
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
// the message has been passed to all queues. i.e. we are
// not relying on the all the increments having taken place before the delivery manager decrements.
- if (_referenceCount.decrementAndGet() == 0)
+ /*if (_referenceCount.decrementAndGet() == 0)
{
try
{
_store.removeMessage(_messageId);
}
- catch(AMQException e)
+ catch (AMQException e)
{
//to maintain consistency, we revert the count
incrementReference();
throw new MessageCleanupException(_messageId, e);
}
- }
+ } */
}
public void setPublisher(AMQProtocolSession publisher)
@@ -288,7 +317,7 @@ public class AMQMessage
public boolean checkToken(Object token)
{
- if(_tokens.contains(token))
+ if (_tokens.contains(token))
{
return true;
}
@@ -304,54 +333,40 @@ public class AMQMessage
//if the message is not persistent or the queue is not durable
//we will not need to recover the association and so do not
//need to record it
- if(isPersistent() && queue.isDurable())
+ /*if (isPersistent() && queue.isDurable())
{
_store.enqueueMessage(queue.getName(), _messageId);
- }
+ } */
}
public void dequeue(AMQQueue queue) throws AMQException
{
//only record associations where both queue and message will survive
//a restart, so only need to remove association if this is the case
- if(isPersistent() && queue.isDurable())
+ /*if (isPersistent() && queue.isDurable())
{
_store.dequeueMessage(queue.getName(), _messageId);
- }
+ } */
}
public boolean isPersistent() throws AMQException
{
- if(_contentHeaderBody == null)
- {
- throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
- }
+ return _messageHandle.isPersistent();
- //todo remove literal values to a constant file such as AMQConstants in common
- return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
- &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
- }
-
- public void setTxnBuffer(TxnBuffer buffer)
- {
- _txnBuffer = buffer;
- }
-
- public TxnBuffer getTxnBuffer()
- {
- return _txnBuffer;
}
/**
- * Called to enforce the 'immediate' flag.
+ * Called to enforce the 'immediate' flag.
+ *
* @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * immediate delivery but has not been marked as delivered to a
+ * consumer
*/
- public void checkDeliveredToConsumer() throws NoConsumersException{
- if(isImmediate() && !_deliveredToConsumer)
+ public void checkDeliveredToConsumer() throws NoConsumersException
+ {
+ if (isImmediate() && !_deliveredToConsumer)
{
- throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
+ throw new NoConsumersException(this);
}
}
@@ -359,7 +374,121 @@ public class AMQMessage
* Called when this message is delivered to a consumer. (used to
* implement the 'immediate' flag functionality).
*/
- public void setDeliveredToConsumer(){
+ public void setDeliveredToConsumer()
+ {
_deliveredToConsumer = true;
}
+
+ /**
+ * Registers a queue to which this message is to be delivered. This is
+ * called from the exchange when it is routing the message. This will be called before any content bodies have
+ * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria.
+ *
+ * @param queue the queue
+ */
+ public void registerQueue(AMQQueue queue)
+ {
+ _destinationQueues.add(queue);
+ }
+
+ private void deliver() throws AMQException
+ {
+ for (AMQQueue q : _destinationQueues)
+ {
+ _txnContext.deliver(this, q);
+ }
+ }
+
+ public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, String consumerTag)
+ {
+ ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver,
+ new AMQDataBlock[]{contentHeader});
+ protocolSession.writeFrame(compositeBlock);
+ }
+
+ //
+ // Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ //
+ while (bodyFrameIterator.hasNext())
+ {
+ protocolSession.writeFrame(bodyFrameIterator.next());
+ }
+
+ }
+
+ private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, String consumerTag)
+ {
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, consumerTag,
+ deliveryTag, false, getExchangeName(),
+ getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText)
+ {
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, replyCode, replyText, _publishBody.exchange,
+ _publishBody.routingKey);
+ ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+ returnFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, String replyText)
+ {
+ ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText);
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+ new AMQDataBlock[]{contentHeader});
+ protocolSession.writeFrame(compositeBlock);
+ }
+
+ //
+ // Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ //
+ while (bodyFrameIterator.hasNext())
+ {
+ protocolSession.writeFrame(bodyFrameIterator.next());
+ }
+ }
}
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
new file mode 100644
index 0000000000..62ad28237f
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -0,0 +1,53 @@
+/**
+ * User: Robert Greig
+ * Date: 23-Oct-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ ******************************************************************************/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public interface AMQMessageHandle
+{
+ ContentHeaderBody getContentHeaderBody();
+
+ void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException;
+
+ /**
+ * @return the number of body frames associated with this message
+ */
+ int getBodyCount();
+
+ /**
+ * @return the size of the body
+ */
+ long getBodySize();
+
+ /**
+ * Get a particular content body
+ * @param index the index of the body to retrieve, must be between 0 and getBodyCount() - 1
+ * @return a content body
+ * @throws IllegalArgumentException if the index is invalid
+ */
+ ContentBody getContentBody(int index) throws IllegalArgumentException;
+
+ void addContentBodyFrame(ContentBody contentBody) throws AMQException;
+
+ String getExchangeName();
+
+ String getRoutingKey();
+
+ boolean isImmediate();
+
+ boolean isRedelivered();
+
+ boolean isPersistent() throws AMQException;
+} \ No newline at end of file
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
index 7fb6ddcb5c..914dafe6e0 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
@@ -21,24 +21,15 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.management.*;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.txn.TxnBuffer;
-import org.apache.qpid.server.txn.TxnOp;
-
-import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.NotCompliantMBeanException;
-import javax.management.Notification;
+
+import javax.management.*;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.*;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
@@ -284,7 +275,7 @@ public class AMQQueue implements Managable
{
queueSize = queueSize + getMessageSize(message);
}
- return new Long(Math.round(queueSize/100));
+ return (long) Math.round(queueSize/100);
}
// Operations
@@ -292,16 +283,11 @@ public class AMQQueue implements Managable
private long getMessageSize(AMQMessage msg)
{
if (msg == null)
- return 0l;
-
- List<ContentBody> cBodies = msg.getContentBodies();
- long messageSize = 0;
- for (ContentBody body : cBodies)
{
- if (body != null)
- messageSize = messageSize + body.getSize();
+ return 0L;
}
- return messageSize;
+
+ return msg.getContentHeaderBody().bodySize;
}
// Checks if there is any notification to be send to the listeners
@@ -386,7 +372,8 @@ public class AMQQueue implements Managable
if (beginIndex > list.size())
{
- throw new JMException("FromIndex = " + beginIndex + ". There are only " + list.size() + " messages in the queue");
+ throw new JMException("FromIndex = " + beginIndex + ". There are only " + list.size() +
+ " messages in the queue");
}
endIndex = endIndex < list.size() ? endIndex : list.size();
@@ -397,14 +384,15 @@ public class AMQQueue implements Managable
AMQMessage msg = list.get(i - 1);
long msgId = msg.getMessageId();
- List<ContentBody> cBodies = msg.getContentBodies();
+ Iterator<ContentBody> cBodies = msg.getContentBodyIterator();
TabularDataSupport _contentList = new TabularDataSupport(_contentBodyListType);
int contentSerialNo = 1;
long size = 0;
- for (ContentBody body : cBodies)
+ while (cBodies.hasNext())
{
+ ContentBody body = cBodies.next();
if (body.getSize() != 0)
{
Byte[] byteArray = getByteArray(body.payload.slice().array());
@@ -665,7 +653,7 @@ public class AMQQueue implements Managable
delete();
}
- public void deliver(AMQMessage msg) throws AMQException
+ /*public void deliver(AMQMessage msg) throws AMQException
{
TxnBuffer buffer = msg.getTxnBuffer();
if(buffer == null)
@@ -678,15 +666,15 @@ public class AMQQueue implements Managable
{
buffer.enlist(new Deliver(msg));
}
- }
+ } */
- private void record(AMQMessage msg) throws AMQException
+ /*private void record(AMQMessage msg) throws AMQException
{
msg.enqueue(this);
msg.incrementReference();
- }
+ } */
- private void process(AMQMessage msg) throws FailedDequeueException
+ public void process(AMQMessage msg) throws FailedDequeueException
{
_deliveryMgr.deliver(getName(), msg);
updateReceivedMessageCount(msg);
@@ -773,45 +761,4 @@ public class AMQQueue implements Managable
_logger.debug(MessageFormat.format(msg, args));
}
}
-
- private class Deliver implements TxnOp
- {
- private final AMQMessage _msg;
-
- Deliver(AMQMessage msg)
- {
- _msg = msg;
- }
-
- public void prepare() throws AMQException
- {
- //do the persistent part of the record()
- _msg.enqueue(AMQQueue.this);
- }
-
- public void undoPrepare()
- {
- }
-
- public void commit()
- {
- //do the memeory part of the record()
- _msg.incrementReference();
- //then process the message
- try
- {
- process(_msg);
- }
- catch(FailedDequeueException e)
- {
- //TODO: is there anything else we can do here? I think not...
- _logger.error("Error during commit of a queue delivery: " + e, e);
- }
- }
-
- public void rollback()
- {
- }
- }
-
}
diff --git a/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java b/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
new file mode 100644
index 0000000000..e0e903f21f
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
@@ -0,0 +1,30 @@
+/**
+ * User: Robert Greig
+ * Date: 31-Oct-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ ******************************************************************************/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageStore;
+
+/**
+ * Constructs a message handle based on the publish body, the content header and the queue to which the message
+ * has been routed.
+ *
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class MessageHandleFactory
+{
+
+ public AMQMessageHandle createMessageHandle(MessageStore store, BasicPublishBody publish,
+ ContentHeaderBody contentHeader)
+ {
+ // just hardcoded for now
+ return new WeakReferenceMessageHandle(store, contentHeader);
+ }
+}
diff --git a/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
index 277d7e2793..09f01d6d97 100644
--- a/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
+++ b/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
@@ -17,13 +17,8 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.protocol.AMQConstant;
-
-import java.util.List;
+import org.apache.qpid.server.RequiredDeliveryException;
/**
* Signals that no consumers exist for a message at a given point in time.
@@ -32,19 +27,9 @@ import java.util.List;
*/
public class NoConsumersException extends RequiredDeliveryException
{
- public NoConsumersException(String queue,
- BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
- {
- super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
- }
-
- public NoConsumersException(BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
+ public NoConsumersException(AMQMessage message)
{
- super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies);
+ super("Immediate delivery is not possible.", message);
}
public int getReplyCode()
diff --git a/java/broker/src/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/org/apache/qpid/server/queue/Subscription.java
index 2c073427da..2d5ad8309f 100644
--- a/java/broker/src/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/org/apache/qpid/server/queue/Subscription.java
@@ -25,5 +25,5 @@ public interface Subscription
boolean isSuspended();
- void queueDeleted(AMQQueue queue);
+ void queueDeleted(AMQQueue queue) throws AMQException;
}
diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
index b0855999a2..ed00a57dde 100644
--- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -18,11 +18,7 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -127,8 +123,8 @@ public class SubscriptionImpl implements Subscription
if (msg != null)
{
// if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
+ // we can decrement the reference count immediately.
+
// By doing this _before_ the send we ensure that it
// doesn't get sent if it can't be dequeued, preventing
// duplicate delivery on recovery.
@@ -148,10 +144,7 @@ public class SubscriptionImpl implements Subscription
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
-
- protocolSession.writeFrame(frame);
+ msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
}
}
else
@@ -170,19 +163,8 @@ public class SubscriptionImpl implements Subscription
*
* @param queue
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(AMQQueue queue) throws AMQException
{
channel.queueDeleted(queue);
}
-
- private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
- {
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
- deliveryTag, false, exchange,
- routingKey);
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
}
diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
index 3b53792234..fe55bd071b 100644
--- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -18,6 +18,8 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -166,7 +168,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
* channel, which in turn can update its list of unacknowledged messages.
* @param queue
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(AMQQueue queue) throws AMQException
{
for (Subscription s : _subscriptions)
{
diff --git a/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
new file mode 100644
index 0000000000..1228773c37
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -0,0 +1,99 @@
+/**
+ * User: Robert Greig
+ * Date: 23-Oct-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ ******************************************************************************/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import java.util.List;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class WeakReferenceMessageHandle implements AMQMessageHandle
+{
+ private ContentHeaderBody _contentHeaderBody;
+
+ private List<ContentBody> _contentBodies;
+
+ private boolean _redelivered;
+
+ private final MessageStore _messageStore;
+
+ public WeakReferenceMessageHandle(MessageStore messageStore, ContentHeaderBody contentHeader)
+ {
+ _messageStore = messageStore;
+ _contentHeaderBody = contentHeader;
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
+ {
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public int getBodyCount()
+ {
+ return _contentBodies.size();
+ }
+
+ public long getBodySize()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public ContentBody getContentBody(int index) throws IllegalArgumentException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void addContentBodyFrame(ContentBody contentBody) throws AMQException
+ {
+ _contentBodies.add(contentBody);
+ }
+
+ public String getExchangeName()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public String getRoutingKey()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isImmediate()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
+
+ public boolean isPersistent() throws AMQException
+ {
+ if (_contentHeaderBody == null)
+ {
+ throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
+ }
+
+ //todo remove literal values to a constant file such as AMQConstants in common
+ return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
+ && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ }
+}
diff --git a/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java b/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java
new file mode 100644
index 0000000000..bc0e993f83
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.RequiredDeliveryException;
+
+import java.util.List;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CleanupMessageOperation implements TxnOp
+{
+ private static final Logger _log = Logger.getLogger(CleanupMessageOperation.class);
+
+ private final AMQMessage _msg;
+
+ private final List<RequiredDeliveryException> _returns;
+
+ public CleanupMessageOperation(AMQMessage msg, List<RequiredDeliveryException> returns)
+ {
+ _msg = msg;
+ _returns = returns;
+ }
+
+ public void prepare() throws AMQException
+ {
+ }
+
+ public void undoPrepare()
+ {
+ //don't need to do anything here, if the store's txn failed
+ //when processing prepare then the message was not stored
+ //or enqueued on any queues and can be discarded
+ }
+
+ public void commit()
+ {
+ //The routers reference can now be released. This is done
+ //here to ensure that it happens after the queues that
+ //enqueue it have incremented their counts (which as a
+ //memory only operation is done in the commit phase).
+ try
+ {
+ _msg.decrementReference();
+ }
+ catch (AMQException e)
+ {
+ _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
+ }
+ try
+ {
+ _msg.checkDeliveredToConsumer();
+ }
+ catch (NoConsumersException e)
+ {
+ //TODO: store this for delivery after the commit-ok
+ _returns.add(e);
+ }
+ }
+
+ public void rollback()
+ {
+ }
+}
diff --git a/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java b/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
new file mode 100644
index 0000000000..baf2333751
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
@@ -0,0 +1,63 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ ******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.FailedDequeueException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.log4j.Logger;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class DeliverMessageOperation implements TxnOp
+{
+ private static final Logger _logger = Logger.getLogger(DeliverMessageOperation.class);
+
+ private final AMQMessage _msg;
+
+ private final AMQQueue _queue;
+
+ public DeliverMessageOperation(AMQMessage msg, AMQQueue queue)
+ {
+ _msg = msg;
+ _queue = queue;
+ }
+
+ public void prepare() throws AMQException
+ {
+ //do the persistent part of the record()
+ _msg.enqueue(_queue);
+ }
+
+ public void undoPrepare()
+ {
+ }
+
+ public void commit()
+ {
+ //do the memeory part of the record()
+ _msg.incrementReference();
+ //then process the message
+ try
+ {
+ _queue.process(_msg);
+ }
+ catch (FailedDequeueException e)
+ {
+ //TODO: is there anything else we can do here? I think not...
+ _logger.error("Error during commit of a queue delivery: " + e, e);
+ }
+ }
+
+ public void rollback()
+ {
+ }
+}
diff --git a/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
new file mode 100644
index 0000000000..9d04978b23
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -0,0 +1,118 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ ******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.TxAck;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.RequiredDeliveryException;
+
+import java.util.List;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class LocalTransactionalContext implements TransactionalContext
+{
+ private final TxnBuffer _txnBuffer;
+
+ /**
+ * We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are
+ * consolidated into a single operation
+ */
+ private TxAck _ackOp;
+
+ private List<RequiredDeliveryException> _returnMessages;
+
+ public LocalTransactionalContext(TxnBuffer txnBuffer, List<RequiredDeliveryException> returnMessages)
+ {
+ _txnBuffer = txnBuffer;
+ _returnMessages = returnMessages;
+ }
+
+ public void rollback() throws AMQException
+ {
+ _txnBuffer.rollback();
+ }
+
+ public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ {
+ // don't create a transaction unless needed
+ if (message.isPersistent())
+ {
+ _txnBuffer.containsPersistentChanges();
+ }
+
+ // A publication will result in the enlisting of several
+ // TxnOps. The first is an op that will store the message.
+ // Following that (and ordering is important), an op will
+ // be added for every queue onto which the message is
+ // enqueued. Finally a cleanup op will be added to decrement
+ // the reference associated with the routing.
+ _txnBuffer.enlist(new StoreMessageOperation(message));
+ _txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+ _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+ }
+
+ private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
+ {
+ if (!unacknowledgedMessageMap.contains(deliveryTag))
+ {
+ throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
+ }
+ }
+
+ public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
+ UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
+ {
+ //check that the tag exists to give early failure
+ if (!multiple || deliveryTag > 0)
+ {
+ checkAck(deliveryTag, unacknowledgedMessageMap);
+ }
+ //we use a single txn op for all acks and update this op
+ //as new acks come in. If this is the first ack in the txn
+ //we will need to create and enlist the op.
+ if (_ackOp == null)
+ {
+ _ackOp = new TxAck(unacknowledgedMessageMap);
+ _txnBuffer.enlist(_ackOp);
+ }
+ // update the op to include this ack request
+ if (multiple && deliveryTag == 0)
+ {
+ // if have signalled to ack all, that refers only
+ // to all at this time
+ _ackOp.update(lastDeliveryTag, multiple);
+ }
+ else
+ {
+ _ackOp.update(deliveryTag, multiple);
+ }
+ }
+
+ public void commit() throws AMQException
+ {
+ if (_ackOp != null)
+ {
+ _ackOp.consolidate();
+ if (_ackOp.checkPersistent())
+ {
+ _txnBuffer.containsPersistentChanges();
+ }
+ //already enlisted, after commit will reset regardless of outcome
+ _ackOp = null;
+ }
+
+ _txnBuffer.commit();
+ //TODO: may need to return 'immediate' messages at this point
+ }
+}
diff --git a/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
new file mode 100644
index 0000000000..381e92de92
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -0,0 +1,146 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class NonTransactionalContext implements TransactionalContext
+{
+ private static final Logger _log = Logger.getLogger(NonTransactionalContext.class);
+
+ /**
+ * Channel is useful for logging
+ */
+ private AMQChannel _channel;
+
+ /**
+ * Where to put undeliverable messages
+ */
+ private List<RequiredDeliveryException> _returnMessages;
+
+ public NonTransactionalContext(AMQChannel channel, List<RequiredDeliveryException> returnMessages)
+ {
+ _channel = channel;
+ _returnMessages = returnMessages;
+ }
+
+ public void commit() throws AMQException
+ {
+ // Does not apply to this context
+ }
+
+ public void rollback() throws AMQException
+ {
+ // Does not apply to this context
+ }
+
+ public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ {
+ try
+ {
+ queue.process(message);
+ //following check implements the functionality
+ //required by the 'immediate' flag:
+ message.checkDeliveredToConsumer();
+ }
+ catch (NoConsumersException e)
+ {
+ _returnMessages.add(e);
+ }
+ finally
+ {
+ message.decrementReference();
+ }
+ }
+
+ public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag,
+ boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap)
+ throws AMQException
+ {
+ if (multiple)
+ {
+ if (deliveryTag == 0)
+ {
+
+ //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero,
+ // tells the server to acknowledge all outstanding mesages.
+ _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" +
+ unacknowledgedMessageMap.size());
+ unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ message.discard();
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ unacknowledgedMessageMap.clear();
+ }
+ });
+ }
+ else
+ {
+ if (!unacknowledgedMessageMap.contains(deliveryTag))
+ {
+ throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+ }
+
+ LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
+ unacknowledgedMessageMap.drainTo(acked, deliveryTag);
+ for (UnacknowledgedMessage msg : acked)
+ {
+ msg.discard();
+ }
+ }
+ }
+ else
+ {
+ UnacknowledgedMessage msg;
+ msg = unacknowledgedMessageMap.remove(deliveryTag);
+
+ if (msg == null)
+ {
+ _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
+ _channel.getChannelId());
+ throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
+ _channel.getChannelId());
+ }
+ msg.discard();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
+ }
+ }
+ }
+}
diff --git a/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java b/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
new file mode 100644
index 0000000000..277bb66613
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
@@ -0,0 +1,47 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ ******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class StoreMessageOperation implements TxnOp
+{
+ //just use this to do a store of the message during the
+ //prepare phase. Any enqueueing etc is done by TxnOps enlisted
+ //by the queues themselves.
+ private final AMQMessage _msg;
+
+ public StoreMessageOperation(AMQMessage msg)
+ {
+ _msg = msg;
+ }
+
+ public void prepare() throws AMQException
+ {
+ _msg.storeMessage();
+ // the router's reference can now be released
+ _msg.decrementReference();
+ }
+
+ public void undoPrepare()
+ {
+ }
+
+ public void commit()
+ {
+ }
+
+ public void rollback()
+ {
+ }
+}
diff --git a/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java
new file mode 100644
index 0000000000..feaf498ad6
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java
@@ -0,0 +1,29 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ ******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public interface TransactionalContext
+{
+ void commit() throws AMQException;
+
+ void rollback() throws AMQException;
+
+ void deliver(AMQMessage message, AMQQueue queue) throws AMQException;
+
+ void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
+ UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
+}