diff options
author | Robert Greig <rgreig@apache.org> | 2006-11-03 17:17:11 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-11-03 17:17:11 +0000 |
commit | 13c0fdd366018a57ccaabe0aa8a01f752ca27356 (patch) | |
tree | 63a3e754e00c057c71d91a339f177643b1649984 | |
parent | 53da5dbacf15a717760552de283519babd7e4736 (diff) | |
download | qpid-python-13c0fdd366018a57ccaabe0aa8a01f752ca27356.tar.gz |
QPID-32 Commit of work in progress changes. Note that with these changes the tree no longer compiles.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@470908 13f79535-47bb-0310-9956-ffa450edef68
29 files changed, 1293 insertions, 720 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java index 055af16159..748a4c121f 100644 --- a/java/broker/src/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java @@ -19,11 +19,9 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; @@ -31,20 +29,16 @@ import org.apache.qpid.server.exchange.MessageRouter; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.LocalTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.TxnBuffer; -import org.apache.qpid.server.txn.TxnOp; +import org.apache.qpid.server.txn.NonTransactionalContext; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -56,7 +50,7 @@ public class AMQChannel private final int _channelId; - private boolean _transactional; + //private boolean _transactional; private long _prefetchCount; @@ -92,21 +86,18 @@ public class AMQChannel private final MessageStore _messageStore; - private final Object _unacknowledgedMessageMapLock = new Object(); - - private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); - - private long _lastDeliveryTag; + //private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); + private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); private final AtomicBoolean _suspended = new AtomicBoolean(false); private final MessageRouter _exchanges; - private final TxnBuffer _txnBuffer; + private TransactionalContext _txnContext; - private TxAck ackOp; + private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>(); - private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>(); + private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory(); public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException @@ -115,22 +106,21 @@ public class AMQChannel _prefetchCount = DEFAULT_PREFETCH; _messageStore = messageStore; _exchanges = exchanges; - _txnBuffer = new TxnBuffer(_messageStore); - } - - public int getChannelId() - { - return _channelId; + // TODO: fix me to pass in the txn context or at least have a factory for it + _txnContext = new NonTransactionalContext(this, _returnMessages); } - public boolean isTransactional() + /** + * Sets this channel to be part of a local transaction + */ + public void setLocalTransactional() { - return _transactional; + _txnContext = new LocalTransactionalContext(new TxnBuffer(_messageStore), _returnMessages); } - public void setTransactional(boolean transactional) + public int getChannelId() { - _transactional = transactional; + return _channelId; } public long getPrefetchCount() @@ -145,8 +135,9 @@ public class AMQChannel public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException { - _currentMessage = new AMQMessage(_messageStore, publishBody); - _currentMessage.setPublisher(publisher); + _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), publishBody, _txnContext); + // TODO: used in clustering only I think (RG) + //_currentMessage.setPublisher(publisher); } public void publishContentHeader(ContentHeaderBody contentHeaderBody) @@ -154,15 +145,18 @@ public class AMQChannel { if (_currentMessage == null) { - throw new AMQException("Received content header without previously receiving a BasicDeliver frame"); + throw new AMQException("Received content header without previously receiving a BasicPublish frame"); } else { _currentMessage.setContentHeaderBody(contentHeaderBody); - // check and route if header says body length is zero + routeCurrentMessage(); + _currentMessage.routingComplete(_messageStore, _messageHandleFactory); + + // check and deliver if header says body length is zero if (contentHeaderBody.bodySize == 0) { - routeCurrentMessage(); + _currentMessage = null; } } } @@ -174,24 +168,36 @@ public class AMQChannel { throw new AMQException("Received content body without previously receiving a JmsPublishBody"); } - if (_currentMessage.getContentHeaderBody() == null) + + // returns true iff the message was delivered (i.e. if all data was + // received + try { - throw new AMQException("Received content body without previously receiving a content header"); + if (_currentMessage.addContentBodyFrame(contentBody)) + { + _currentMessage = null; + } } - - _currentMessage.addContentBodyFrame(contentBody); - if (_currentMessage.isAllContentReceived()) + catch (AMQException e) { - routeCurrentMessage(); + // we want to make sure we don't keep a reference to the message in the + // event of an error + _currentMessage = null; + throw e; } } protected void routeCurrentMessage() throws AMQException { + _exchanges.routeContent(_currentMessage); + } + + /*protected void routeCurrentMessage2() throws AMQException + { if (_transactional) { - //don't create a transaction unless needed - if(_currentMessage.isPersistent()) + // don't create a transaction unless needed + if (_currentMessage.isPersistent()) { _txnBuffer.containsPersistentChanges(); } @@ -210,7 +216,7 @@ public class AMQChannel _exchanges.routeContent(_currentMessage); _txnBuffer.enlist(new Cleanup(_currentMessage)); } - catch(RequiredDeliveryException e) + catch (RequiredDeliveryException e) { //Can only be due to the mandatory flag, as no attempt //has yet been made to deliver the message. The @@ -236,12 +242,12 @@ public class AMQChannel _currentMessage.checkDeliveredToConsumer(); } finally - { + { _currentMessage.decrementReference(); _currentMessage = null; } } - } + } */ public long getNextDeliveryTag() { @@ -266,7 +272,8 @@ public class AMQChannel * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ - public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException + public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) + throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -304,13 +311,14 @@ public class AMQChannel */ public void close(AMQProtocolSession session) throws AMQException { - if (_transactional) + /*if (_transactional) { - synchronized(_txnBuffer) + synchronized (_txnBuffer) { _txnBuffer.rollback();//releases messages } - } + } */ + _txnContext.rollback(); unsubscribeAllConsumers(session); requeue(); } @@ -328,39 +336,32 @@ public class AMQChannel /** * Add a message to the channel-based list of unacknowledged messages * - * @param message - * @param deliveryTag - * @param queue + * @param message the message that was delivered + * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of + * the delivery tag) + * @param queue the queue from which the message was delivered */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue) { - synchronized(_unacknowledgedMessageMapLock) - { - _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); - _lastDeliveryTag = deliveryTag; - checkSuspension(); - } + _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); + checkSuspension(); } /** * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. * May result in delivery to this same channel or to other subscribers. + * @throws org.apache.qpid.AMQException if the requeue fails */ public void requeue() throws AMQException { // we must create a new map since all the messages will get a new delivery tag when they are redelivered - Map<Long, UnacknowledgedMessage> currentList; - synchronized(_unacknowledgedMessageMapLock) - { - currentList = _unacknowledgedMessageMap; - _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); - } + Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); - for (UnacknowledgedMessage unacked : currentList.values()) + for (UnacknowledgedMessage unacked : messagesToBeDelivered) { if (unacked.queue != null) { - unacked.queue.deliver(unacked.message); + _txnContext.deliver(unacked.message, unacked.queue); } } } @@ -368,53 +369,62 @@ public class AMQChannel /** * Called to resend all outstanding unacknowledged messages to this same channel. */ - public void resend(AMQProtocolSession session) + public void resend(final AMQProtocolSession session) throws AMQException { - //messages go to this channel - synchronized(_unacknowledgedMessageMapLock) + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet()) + public boolean callback(UnacknowledgedMessage message) { - long deliveryTag = entry.getKey(); - String consumerTag = entry.getValue().consumerTag; - AMQMessage msg = entry.getValue().message; + long deliveryTag = message.deliveryTag; + String consumerTag = message.consumerTag; + AMQMessage msg = message.message; - session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag)); + msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); + // false means continue processing + return false; } - } + + public void visitComplete() + { + } + }); } /** * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged * messages to remove the queue reference and also decrement any message reference counts, without - * actually removing the item sine we may get an ack for a delivery tag that was generated from the + * actually removing the item since we may get an ack for a delivery tag that was generated from the * deleted queue. * - * @param queue + * @param queue the queue that has been deleted + * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages */ - public void queueDeleted(AMQQueue queue) + public void queueDeleted(final AMQQueue queue) throws AMQException { - synchronized(_unacknowledgedMessageMapLock) + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet()) + public boolean callback(UnacknowledgedMessage message) throws AMQException { - final UnacknowledgedMessage unackedMsg = unacked.getValue(); - // we can compare the reference safely in this case - if (unackedMsg.queue == queue) + if (message.queue == queue) { - unackedMsg.queue = null; + message.queue = null; try { - unackedMsg.message.decrementReference(); + message.message.decrementReference(); } catch (AMQException e) { - _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " + + _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e); } } + return false; } - } + + public void visitComplete() + { + } + }); } /** @@ -427,20 +437,22 @@ public class AMQChannel */ public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException { - if (_transactional) + _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext); + checkSuspension(); + /*if (_transactional) { //check that the tag exists to give early failure - if(!multiple || deliveryTag > 0) + if (!multiple || deliveryTag > 0) { checkAck(deliveryTag); } //we use a single txn op for all acks and update this op //as new acks come in. If this is the first ack in the txn //we will need to create and enlist the op. - if(ackOp == null) + if (_ackOp == null) { - ackOp = new TxAck(new AckMap()); - _txnBuffer.enlist(ackOp); + _ackOp = new TxAck(new AckMap()); + _txnBuffer.enlist(_ackOp); } //update the op to include this ack request if(multiple && deliveryTag == 0) @@ -449,108 +461,18 @@ public class AMQChannel { //if have signalled to ack all, that refers only //to all at this time - ackOp.update(_lastDeliveryTag, multiple); + _ackOp.update(_lastDeliveryTag, multiple); } } else { - ackOp.update(deliveryTag, multiple); + _ackOp.update(deliveryTag, multiple); } } else { handleAcknowledgement(deliveryTag, multiple); - } - } - - private void checkAck(long deliveryTag) throws AMQException - { - synchronized(_unacknowledgedMessageMapLock) - { - if (!_unacknowledgedMessageMap.containsKey(deliveryTag)) - { - throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel"); - } - } - } - - private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException - { - if (multiple) - { - LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); - synchronized(_unacknowledgedMessageMapLock) - { - if (deliveryTag == 0) - { - //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages. - _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size()); - acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values()); - _unacknowledgedMessageMap.clear(); - } - else - { - if (!_unacknowledgedMessageMap.containsKey(deliveryTag)) - { - throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); - } - Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator(); - - while (i.hasNext()) - { - - Map.Entry<Long, UnacknowledgedMessage> unacked = i.next(); - - if (unacked.getKey() > deliveryTag) - { - //This should not occur now. - throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString()); - } - - i.remove(); - - acked.add(unacked.getValue()); - if (unacked.getKey() == deliveryTag) - { - break; - } - } - } - }// synchronized - - if (_log.isDebugEnabled()) - { - _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " + - acked.size() + " items."); - } - - for (UnacknowledgedMessage msg : acked) - { - msg.discard(); - } - - } - else - { - UnacknowledgedMessage msg; - synchronized(_unacknowledgedMessageMapLock) - { - msg = _unacknowledgedMessageMap.remove(deliveryTag); - } - - if (msg == null) - { - _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); - throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); - } - msg.discard(); - if (_log.isDebugEnabled()) - { - _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag); - } - } - - checkSuspension(); + } */ } /** @@ -558,19 +480,14 @@ public class AMQChannel * * @return the map of unacknowledged messages */ - public Map<Long, UnacknowledgedMessage> getUnacknowledgedMessageMap() + public UnacknowledgedMessageMap getUnacknowledgedMessageMap() { return _unacknowledgedMessageMap; } private void checkSuspension() { - boolean suspend; - //noinspection SynchronizeOnNonFinalField - synchronized(_unacknowledgedMessageMapLock) - { - suspend = _unacknowledgedMessageMap.size() >= _prefetchCount; - } + boolean suspend = _unacknowledgedMessageMap.size() >= _prefetchCount; setSuspended(suspend); } @@ -602,33 +519,18 @@ public class AMQChannel public void commit() throws AMQException { - if(ackOp != null) - { - ackOp.consolidate(); - if(ackOp.checkPersistent()) - { - _txnBuffer.containsPersistentChanges(); - } - ackOp = null;//already enlisted, after commit will reset regardless of outcome - } - - _txnBuffer.commit(); - //TODO: may need to return 'immediate' messages at this point + _txnContext.commit(); } public void rollback() throws AMQException { - //need to protect rollback and close from each other... - synchronized(_txnBuffer) - { - _txnBuffer.rollback(); - } + _txnContext.rollback(); } public String toString() { StringBuilder sb = new StringBuilder(30); - sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional); + sb.append("Channel: id ").append(_channelId).append(", transaction context: ").append(_txnContext); sb.append(", prefetch count: ").append(_prefetchCount); return sb.toString(); } @@ -638,7 +540,6 @@ public class AMQChannel { StringBuilder sb = new StringBuilder(30); sb.append("Channel:id=").append(_channelId); - sb.append(",transaction mode=").append(_transactional); return new ObjectName(sb.toString()); } @@ -654,112 +555,11 @@ public class AMQChannel public void processReturns(AMQProtocolSession session) { - for(AMQDataBlock block : _returns) - { - session.writeFrame(block); - } - _returns.clear(); - } - - //we use this wrapper to ensure we are always using the correct - //map instance (its not final unfortunately) - private class AckMap implements UnacknowledgedMessageMap - { - public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs) - { - impl().collect(deliveryTag, multiple, msgs); - } - public void remove(List<UnacknowledgedMessage> msgs) - { - impl().remove(msgs); - } - - private UnacknowledgedMessageMap impl() - { - return new UnacknowledgedMessageMapImpl(_unacknowledgedMessageMapLock, _unacknowledgedMessageMap); - } - } - - private class Store implements TxnOp - { - //just use this to do a store of the message during the - //prepare phase. Any enqueueing etc is done by TxnOps enlisted - //by the queues themselves. - private final AMQMessage _msg; - - Store(AMQMessage msg) - { - _msg = msg; - } - - public void prepare() throws AMQException - { - _msg.storeMessage(); - //the routers reference can now be released - _msg.decrementReference(); - } - - public void undoPrepare() - { - } - - public void commit() - { - } - - public void rollback() - { - } - } - - private class Cleanup implements TxnOp - { - private final AMQMessage _msg; - - Cleanup(AMQMessage msg) - { - _msg = msg; - } - - public void prepare() throws AMQException - { - } - - public void undoPrepare() - { - //don't need to do anything here, if the store's txn failed - //when processing prepare then the message was not stored - //or enqueued on any queues and can be discarded - } - - public void commit() - { - //The routers reference can now be released. This is done - //here to ensure that it happens after the queues that - //enqueue it have incremented their counts (which as a - //memory only operation is done in the commit phase). - try - { - _msg.decrementReference(); - } - catch(AMQException e) - { - _log.error("On commiting transaction, failed to cleanup unused message: " + e, e); - } - try - { - _msg.checkDeliveredToConsumer(); - } - catch(NoConsumersException e) - { - //TODO: store this for delivery after the commit-ok - _returns.add(e.getReturnMessage(_channelId)); - } - } - - public void rollback() + for (RequiredDeliveryException bouncedMessage : _returnMessages) { + AMQMessage message = bouncedMessage.getAMQMessage(); + message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), bouncedMessage.getMessage()); } + _returnMessages.clear(); } - } diff --git a/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java index 6c4fb6b730..d2b017275a 100644 --- a/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java @@ -17,17 +17,9 @@ */ package org.apache.qpid.server; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.CompositeAMQDataBlock; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; -import java.util.List; - /** * Signals that a required delivery could not be made. This could be bacuse of * the immediate flag being set and the queue having no consumers, or the mandatory @@ -35,75 +27,23 @@ import java.util.List; */ public abstract class RequiredDeliveryException extends AMQException { - private final String _message; - private final BasicPublishBody _publishBody; - private final ContentHeaderBody _contentHeaderBody; - private final List<ContentBody> _contentBodies; + private final AMQMessage _amqMessage; public RequiredDeliveryException(String message, AMQMessage payload) { super(message); - _message = message; - _publishBody = payload.getPublishBody(); - _contentHeaderBody = payload.getContentHeaderBody(); - _contentBodies = payload.getContentBodies(); - } - - public RequiredDeliveryException(String message, - BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, - List<ContentBody> contentBodies) - { - super(message); - _message = message; - _publishBody = publishBody; - _contentHeaderBody = contentHeaderBody; - _contentBodies = contentBodies; + _amqMessage = payload; } - public BasicPublishBody getPublishBody() + public AMQMessage getAMQMessage() { - return _publishBody; - } - - public ContentHeaderBody getContentHeaderBody() - { - return _contentHeaderBody; - } - - public List<ContentBody> getContentBodies() - { - return _contentBodies; - } - - public CompositeAMQDataBlock getReturnMessage(int channel) - { - BasicReturnBody returnBody = new BasicReturnBody(); - returnBody.exchange = _publishBody.exchange; - returnBody.replyCode = getReplyCode(); - returnBody.replyText = _message; - returnBody.routingKey = _publishBody.routingKey; - - AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()]; - - AMQFrame returnFrame = new AMQFrame(); - returnFrame.bodyFrame = returnBody; - returnFrame.channel = channel; - - allFrames[0] = returnFrame; - allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); - for (int i = 2; i < allFrames.length; i++) - { - allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2)); - } - - return new CompositeAMQDataBlock(allFrames); + return _amqMessage; } public int getErrorCode() { return getReplyCode(); - } + } public abstract int getReplyCode(); } diff --git a/java/broker/src/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/org/apache/qpid/server/ack/TxAck.java index 0d502a8f6e..85072b6976 100644 --- a/java/broker/src/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/org/apache/qpid/server/ack/TxAck.java @@ -17,19 +17,15 @@ */ package org.apache.qpid.server.ack; -import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.TxnOp; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; /** * A TxnOp implementation for handling accumulated acks - */ + */ public class TxAck implements TxnOp { private final UnacknowledgedMessageMap _map; @@ -45,14 +41,14 @@ public class TxAck implements TxnOp public void update(long deliveryTag, boolean multiple) { - if(!multiple) + if (!multiple) { //have acked a single message that is not part of //the previously acked region so record //individually _individual.add(deliveryTag);//_multiple && !multiple } - else if(deliveryTag > _deliveryTag) + else if (deliveryTag > _deliveryTag) { //have simply moved the last acked message on a //bit @@ -64,7 +60,7 @@ public class TxAck implements TxnOp public void consolidate() { //lookup all the unacked messages that have been acked in this transaction - if(_multiple) + if (_multiple) { //get all the unacked messages for the accumulated //multiple acks @@ -72,22 +68,22 @@ public class TxAck implements TxnOp } //get any unacked messages for individual acks outside the //range covered by multiple acks - for(long tag : _individual) + for (long tag : _individual) { if(_deliveryTag < tag) { - _map.collect(tag, false, _unacked); + _map.collect(tag, false, _unacked); } } } public boolean checkPersistent() throws AMQException - { + { //if any of the messages in unacked are persistent the txn //buffer must be marked as persistent: - for(UnacknowledgedMessage msg : _unacked) + for (UnacknowledgedMessage msg : _unacked) { - if(msg.message.isPersistent()) + if (msg.message.isPersistent()) { return true; } @@ -98,22 +94,22 @@ public class TxAck implements TxnOp public void prepare() throws AMQException { //make persistent changes, i.e. dequeue and decrementReference - for(UnacknowledgedMessage msg : _unacked) + for (UnacknowledgedMessage msg : _unacked) { msg.discard(); } } - + public void undoPrepare() { //decrementReference is annoyingly untransactional (due to //in memory counter) so if we failed in prepare for full //txn, this op will have to compensate by fixing the count - //in memory (persistent changes will be rolled back by store) - for(UnacknowledgedMessage msg : _unacked) + //in memory (persistent changes will be rolled back by store) + for (UnacknowledgedMessage msg : _unacked) { msg.message.incrementReference(); - } + } } public void commit() diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java index ccb55f7e2d..8fece7bcc1 100644 --- a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java +++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java @@ -17,11 +17,46 @@ */ package org.apache.qpid.server.ack; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.AMQException; + +import java.util.Collection; import java.util.List; public interface UnacknowledgedMessageMap { - public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs); - public void remove(List<UnacknowledgedMessage> msgs); + public interface Visitor + { + /** + * @param message the message being iterated over + * @return true to stop iteration, false to continue + * @throws AMQException + */ + boolean callback(UnacknowledgedMessage message) throws AMQException; + + void visitComplete(); + } + + void visit(Visitor visitor) throws AMQException; + + void add(long deliveryTag, UnacknowledgedMessage message); + + void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs); + + boolean contains(long deliveryTag) throws AMQException; + + void remove(List<UnacknowledgedMessage> msgs); + + UnacknowledgedMessage remove(long deliveryTag); + + void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException; + + Collection<UnacknowledgedMessage> cancelAllMessages(); + + void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException; + + int size(); + + void clear(); } diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 117793930f..906ef9463c 100644 --- a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -17,19 +17,34 @@ */ package org.apache.qpid.server.ack; -import java.util.List; -import java.util.Map; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.AMQException; + +import java.util.*; public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { - private final Object _lock; + private final Object _lock = new Object(); + private Map<Long, UnacknowledgedMessage> _map; - public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map) + private long _lastDeliveryTag; + + private final int _prefetchLimit; + + public UnacknowledgedMessageMapImpl(int prefetchLimit) + { + _prefetchLimit = prefetchLimit; + _map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit); + } + + /*public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map) { _lock = lock; _map = map; - } + } */ public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs) { @@ -44,20 +59,133 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } + public boolean contains(long deliveryTag) throws AMQException + { + synchronized (_lock) + { + return _map.containsKey(deliveryTag); + } + } + public void remove(List<UnacknowledgedMessage> msgs) { - synchronized(_lock) + synchronized (_lock) { for(UnacknowledgedMessage msg : msgs) { _map.remove(msg.deliveryTag); - } + } + } + } + + public UnacknowledgedMessage remove(long deliveryTag) + { + synchronized (_lock) + { + return _map.remove(deliveryTag); } } + public void visit(Visitor visitor) throws AMQException + { + synchronized (_lock) + { + Collection<UnacknowledgedMessage> currentEntries = _map.values(); + for (UnacknowledgedMessage msg: currentEntries) + { + visitor.callback(msg); + } + } + } + + public void add(long deliveryTag, UnacknowledgedMessage message) + { + synchronized( _lock) + { + _map.put(deliveryTag, message); + _lastDeliveryTag = deliveryTag; + } + } + + public Collection<UnacknowledgedMessage> cancelAllMessages() + { + synchronized (_lock) + { + Collection<UnacknowledgedMessage> currentEntries = _map.values(); + _map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit); + return currentEntries; + } + } + + public void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) + throws AMQException + { + synchronized (_lock) + { + txnContext.acknowledgeMessage(deliveryTag, _lastDeliveryTag, multiple, this); + } + } + + public int size() + { + synchronized (_lock) + { + return _map.size(); + } + } + + public void clear() + { + synchronized (_lock) + { + _map.clear(); + } + } + + public void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException + { + synchronized (_lock) + { + Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = _map.entrySet().iterator(); + while (it.hasNext()) + { + Map.Entry<Long, UnacknowledgedMessage> unacked = it.next(); + + if (unacked.getKey() > deliveryTag) + { + //This should not occur now. + throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + + " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString()); + } + + it.remove(); + + destination.add(unacked.getValue()); + if (unacked.getKey() == deliveryTag) + { + break; + } + } + } + } + + public void resendMessages(AMQProtocolSession protocolSession, int channelId) + { + synchronized (_lock) + { + for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) + { + long deliveryTag = entry.getKey(); + String consumerTag = entry.getValue().consumerTag; + AMQMessage msg = entry.getValue().message; + + msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag); + } + } + } private UnacknowledgedMessage get(long key) { - synchronized(_lock) + synchronized (_lock) { return _map.get(key); } @@ -65,7 +193,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap private void collect(long key, List<UnacknowledgedMessage> msgs) { - synchronized(_lock) + synchronized (_lock) { for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) { @@ -73,9 +201,11 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap if (entry.getKey() == key) { break; - } + } } } } + + } diff --git a/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 06a3944367..62b0415253 100644 --- a/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -17,10 +17,10 @@ */ package org.apache.qpid.server.exchange; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.protocol.ExchangeInitialiser; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.log4j.Logger; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -81,8 +81,9 @@ public class DefaultExchangeRegistry implements ExchangeRegistry final String exchange = payload.getPublishBody().exchange; final Exchange exch = _exchangeMap.get(exchange); // there is a small window of opportunity for the exchange to be deleted in between - // the JmsPublish being received (where the exchange is validated) and the final + // the BasicPublish being received (where the exchange is validated) and the final // content body being received (which triggers this method) + // TODO: check where the exchange is validated if (exch == null) { throw new AMQException("Exchange '" + exchange + "' does not exist"); diff --git a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java index f89223825f..e6cc683b15 100644 --- a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java @@ -212,8 +212,8 @@ public class DestNameExchange extends AbstractExchange } for (AMQQueue q : queues) - { - q.deliver(payload); + { + payload.registerQueue(q); } } } diff --git a/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java index 25d91027e9..6e99eb5c21 100644 --- a/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java @@ -183,8 +183,8 @@ public class DestWildExchange extends AbstractExchange { // TODO: modify code generator to add clone() method then clone the deliver body // without this addition we have a race condition - we will be modifying the body - // before the encoder has encoded the body for delivery - q.deliver(payload); + // before the encoder has encoded the body for delivery + payload.registerQueue(q); } } diff --git a/java/broker/src/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/org/apache/qpid/server/exchange/Exchange.java index cd75825601..02a0badb65 100644 --- a/java/broker/src/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/org/apache/qpid/server/exchange/Exchange.java @@ -19,8 +19,8 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; public interface Exchange { diff --git a/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java index 1167d05875..3b19c11ae4 100644 --- a/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java @@ -175,7 +175,7 @@ public class HeadersExchange extends AbstractExchange { _logger.debug("Exchange " + getName() + ": routing message with headers " + headers); } - boolean delivered = false; + boolean routed = false; for (Registration e : _bindings) { if (e.binding.matches(headers)) @@ -185,11 +185,11 @@ public class HeadersExchange extends AbstractExchange _logger.debug("Exchange " + getName() + ": delivering message with headers " + headers + " to " + e.queue.getName()); } - e.queue.deliver(payload); - delivered = true; + payload.registerQueue(e.queue); + routed = true; } } - if (!delivered) + if (!routed) { _logger.warn("Exchange " + getName() + ": message not routable."); } diff --git a/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java b/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java index 2eef5f0676..832a60c681 100644 --- a/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java +++ b/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java @@ -17,8 +17,8 @@ */ package org.apache.qpid.server.exchange; -import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; /** * Separated out from the ExchangeRegistry interface to allow components @@ -30,6 +30,7 @@ public interface MessageRouter /** * Routes content through exchanges, delivering it to 1 or more queues. * @param message the message to be routed + * * @throws org.apache.qpid.AMQException if something goes wrong delivering data */ void routeContent(AMQMessage message) throws AMQException; diff --git a/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java index d55930489c..cf665950ca 100644 --- a/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java @@ -44,7 +44,7 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<TxSelectBody> evt) throws AMQException { - protocolSession.getChannel(evt.getChannelId()).setTransactional(true); + protocolSession.getChannel(evt.getChannelId()).setLocalTransactional(); protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId())); } } diff --git a/java/broker/src/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 35bc7fb4c7..9ede6157f4 100644 --- a/java/broker/src/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -25,44 +25,17 @@ import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.management.Managable; -import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.management.*; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.state.AMQStateManager; -import javax.management.JMException; -import javax.management.MBeanException; -import javax.management.MBeanNotificationInfo; -import javax.management.MalformedObjectNameException; -import javax.management.NotCompliantMBeanException; -import javax.management.Notification; -import javax.management.ObjectName; +import javax.management.*; import javax.management.monitor.MonitorNotification; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; +import javax.management.openmbean.*; import javax.security.sasl.SaslServer; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -104,9 +77,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private boolean _closed; private long _maxNoOfChannels; - + /* AMQP Version for this session */ - + private byte _major; private byte _minor; @@ -240,10 +213,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); } - if (channel.isTransactional()) - { - channel.commit(); - } + channel.commit(); } catch(AMQException ex) { @@ -260,10 +230,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); } - if (channel.isTransactional()) - { - channel.rollback(); - } + channel.rollback(); } catch(AMQException ex) { @@ -297,7 +264,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Object[] itemValues = {channel.getChannelId(), channelObjectName, - channel.isTransactional(), (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null, channel.getUnacknowledgedMessageMap().size()}; @@ -448,17 +414,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } else { - try - { - contentFrameReceived(frame); - } - catch (RequiredDeliveryException e) - { - //need to return the message: - _logger.info("Returning message to " + this + " channel " + frame.channel - + ": " + e.getMessage()); - writeFrame(e.getReturnMessage(frame.channel)); - } + contentFrameReceived(frame); } } } @@ -539,9 +495,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). + * to calling getProtocolSession().writeDeliver(). * - * @param frame the frame to write + * @param frame the frame to writeDeliver */ public void writeFrame(AMQDataBlock frame) { @@ -702,22 +658,22 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _saslServer = saslServer; } - + /** * Convenience methods for managing AMQP version. * NOTE: Both major and minor will be set to 0 prior to protocol initiation. */ - + public byte getAmqpMajor() { return _major; } - + public byte getAmqpMinor() { return _minor; } - + public boolean amqpVersionEquals(byte major, byte minor) { return _major == major && _minor == minor; diff --git a/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 4478a0cf32..78fb76ae27 100644 --- a/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -139,7 +139,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco _logger.debug("Protocol Session [" + this + "] idle: " + status); if(IdleStatus.WRITER_IDLE.equals(status)) { - //write heartbeat frame: + //writeDeliver heartbeat frame: session.write(HeartbeatBody.FRAME); } else if(IdleStatus.READER_IDLE.equals(status)) @@ -165,7 +165,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco } else if(throwable instanceof IOException) { - _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); + _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); } else { diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java index 7666b5b3f8..825d4feae4 100644 --- a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java @@ -18,17 +18,13 @@ package org.apache.qpid.server.queue; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.txn.TxnBuffer; -import org.apache.qpid.AMQException; +import org.apache.qpid.server.txn.TransactionalContext; -import java.util.ArrayList; -import java.util.List; -import java.util.LinkedList; -import java.util.Set; -import java.util.HashSet; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; /** @@ -36,43 +32,35 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class AMQMessage { + /** + * Used in clustering + */ private final Set<Object> _tokens = new HashSet<Object>(); + /** + * Used in clustering + * TODO need to get rid of this + */ private AMQProtocolSession _publisher; - private final BasicPublishBody _publishBody; - - private ContentHeaderBody _contentHeaderBody; - - private List<ContentBody> _contentBodies; - - private boolean _redelivered; - private final long _messageId; private final AtomicInteger _referenceCount = new AtomicInteger(1); - /** - * Keeps a track of how many bytes we have received in body frames - */ - private long _bodyLengthReceived = 0; + private AMQMessageHandle _messageHandle; /** - * The message store in which this message is contained. + * Stored temporarily until the header has been received at which point it is used when + * constructing the handle */ - private transient final MessageStore _store; + private BasicPublishBody _publishBody; /** - * For non transactional publishes, a message can be stored as - * soon as it is complete. For transactional messages it doesnt - * need to be stored until the transaction is committed. + * Keeps a track of how many bytes we have received in body frames */ - private boolean _storeWhenComplete; + private long _bodyLengthReceived = 0; - /** - * TxnBuffer for transactionally published messages - */ - private TxnBuffer _txnBuffer; + private final TransactionalContext _txnContext; /** * Flag to indicate whether message has been delivered to a @@ -81,55 +69,97 @@ public class AMQMessage */ private boolean _deliveredToConsumer; + private ContentHeaderBody _contentHeaderBody; - public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody) - { - this(messageStore, publishBody, true); - } + private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>(); - public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody, boolean storeWhenComplete) + /** + * Used to iterate through all the body frames associated with this message. Will not + * keep all the data in memory therefore is memory-efficient. + */ + private class BodyFrameIterator implements Iterator<AMQDataBlock> { - _messageId = messageStore.getNewMessageId(); - _publishBody = publishBody; - _store = messageStore; - _contentBodies = new LinkedList<ContentBody>(); - _storeWhenComplete = storeWhenComplete; + private int _channel; + + private int _index = -1; + + private BodyFrameIterator(int channel) + { + _channel = channel; + } + + public boolean hasNext() + { + return _index < _messageHandle.getBodyCount(); + } + + public AMQDataBlock next() + { + return ContentBody.createAMQFrame(_channel, _messageHandle.getContentBody(++_index)); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } } - public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) - throws AMQException - + private class BodyContentIterator implements Iterator<ContentBody> { - _publishBody = publishBody; - _contentHeaderBody = contentHeaderBody; - _contentBodies = contentBodies; - _messageId = messageId; - _store = store; - storeMessage(); + + private int _index = -1; + + public boolean hasNext() + { + return _index < _messageHandle.getBodyCount(); + } + + public ContentBody next() + { + return _messageHandle.getContentBody(++_index); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } } - public AMQMessage(MessageStore store, BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) - throws AMQException + public AMQMessage(long messageId, BasicPublishBody publishBody, TransactionalContext txnContext) { - this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies); + _messageId = messageId; + _txnContext = txnContext; + _publishBody = publishBody; } protected AMQMessage(AMQMessage msg) throws AMQException { - this(msg._store, msg._messageId, msg._publishBody, msg._contentHeaderBody, msg._contentBodies); + _publisher = msg._publisher; + _messageId = msg._messageId; + _messageHandle = msg._messageHandle; + _txnContext = msg._txnContext; + _deliveredToConsumer = msg._deliveredToConsumer; } public void storeMessage() throws AMQException { - if (isPersistent()) + /*if (isPersistent()) { _store.put(this); - } + } */ + } + + public Iterator<AMQDataBlock> getBodyFrameIterator(int channel) + { + return new BodyFrameIterator(channel); + } + + public Iterator<ContentBody> getContentBodyIterator() + { + return new BodyContentIterator(); } - public CompositeAMQDataBlock getDataBlock(ByteBuffer encodedDeliverBody, int channel) + /*public CompositeAMQDataBlock getDataBlock(ByteBuffer encodedDeliverBody, int channel) { AMQFrame[] allFrames = new AMQFrame[1 + _contentBodies.size()]; @@ -167,80 +197,76 @@ public class AMQMessage public BasicPublishBody getPublishBody() { return _publishBody; - } + } */ public ContentHeaderBody getContentHeaderBody() { - return _contentHeaderBody; + return _messageHandle.getContentHeaderBody(); } - public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException + public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) + throws AMQException { _contentHeaderBody = contentHeaderBody; - if (_storeWhenComplete && isAllContentReceived()) - { - storeMessage(); - } } - public List<ContentBody> getContentBodies() + public void routingComplete(MessageStore store, MessageHandleFactory factory) throws AMQException { - return _contentBodies; - } - - public void setContentBodies(List<ContentBody> contentBodies) - { - _contentBodies = contentBodies; + _messageHandle = factory.createMessageHandle(store, _publishBody, _contentHeaderBody); + if (_contentHeaderBody.bodySize == 0) + { + deliver(); + } } - public void addContentBodyFrame(ContentBody contentBody) throws AMQException + public boolean addContentBodyFrame(ContentBody contentBody) throws AMQException { - _contentBodies.add(contentBody); _bodyLengthReceived += contentBody.getSize(); - if (_storeWhenComplete && isAllContentReceived()) + _messageHandle.addContentBodyFrame(contentBody); + if (isAllContentReceived()) { - storeMessage(); + deliver(); + return true; + } + else + { + return false; } } public boolean isAllContentReceived() { - return _bodyLengthReceived == _contentHeaderBody.bodySize; + return _bodyLengthReceived == _messageHandle.getBodySize(); } public boolean isRedelivered() { - return _redelivered; + return _messageHandle.isRedelivered(); } String getExchangeName() { - return _publishBody.exchange; + return _messageHandle.getExchangeName(); } String getRoutingKey() { - return _publishBody.routingKey; + return _messageHandle.getRoutingKey(); } boolean isImmediate() { - return _publishBody.immediate; - } - - NoConsumersException getNoConsumersException(String queue) - { - return new NoConsumersException(queue, _publishBody, _contentHeaderBody, _contentBodies); + return _messageHandle.isImmediate(); } - void setRedelivered(boolean redelivered) + public long getMessageId() { - _redelivered = redelivered; + return _messageId; } - public long getMessageId() + public BasicPublishBody getPublishBody() { - return _messageId; + return _publishBody; } /** @@ -254,6 +280,9 @@ public class AMQMessage /** * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the * message store. + * + * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that + * failed */ public void decrementReference() throws MessageCleanupException { @@ -261,19 +290,19 @@ public class AMQMessage // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after // the message has been passed to all queues. i.e. we are // not relying on the all the increments having taken place before the delivery manager decrements. - if (_referenceCount.decrementAndGet() == 0) + /*if (_referenceCount.decrementAndGet() == 0) { try { _store.removeMessage(_messageId); } - catch(AMQException e) + catch (AMQException e) { //to maintain consistency, we revert the count incrementReference(); throw new MessageCleanupException(_messageId, e); } - } + } */ } public void setPublisher(AMQProtocolSession publisher) @@ -288,7 +317,7 @@ public class AMQMessage public boolean checkToken(Object token) { - if(_tokens.contains(token)) + if (_tokens.contains(token)) { return true; } @@ -304,54 +333,40 @@ public class AMQMessage //if the message is not persistent or the queue is not durable //we will not need to recover the association and so do not //need to record it - if(isPersistent() && queue.isDurable()) + /*if (isPersistent() && queue.isDurable()) { _store.enqueueMessage(queue.getName(), _messageId); - } + } */ } public void dequeue(AMQQueue queue) throws AMQException { //only record associations where both queue and message will survive //a restart, so only need to remove association if this is the case - if(isPersistent() && queue.isDurable()) + /*if (isPersistent() && queue.isDurable()) { _store.dequeueMessage(queue.getName(), _messageId); - } + } */ } public boolean isPersistent() throws AMQException { - if(_contentHeaderBody == null) - { - throw new AMQException("Cannot determine delivery mode of message. Content header not found."); - } + return _messageHandle.isPersistent(); - //todo remove literal values to a constant file such as AMQConstants in common - return _contentHeaderBody.properties instanceof BasicContentHeaderProperties - &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; - } - - public void setTxnBuffer(TxnBuffer buffer) - { - _txnBuffer = buffer; - } - - public TxnBuffer getTxnBuffer() - { - return _txnBuffer; } /** - * Called to enforce the 'immediate' flag. + * Called to enforce the 'immediate' flag. + * * @throws NoConsumersException if the message is marked for - * immediate delivery but has not been marked as delivered to a - * consumer + * immediate delivery but has not been marked as delivered to a + * consumer */ - public void checkDeliveredToConsumer() throws NoConsumersException{ - if(isImmediate() && !_deliveredToConsumer) + public void checkDeliveredToConsumer() throws NoConsumersException + { + if (isImmediate() && !_deliveredToConsumer) { - throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies); + throw new NoConsumersException(this); } } @@ -359,7 +374,121 @@ public class AMQMessage * Called when this message is delivered to a consumer. (used to * implement the 'immediate' flag functionality). */ - public void setDeliveredToConsumer(){ + public void setDeliveredToConsumer() + { _deliveredToConsumer = true; } + + /** + * Registers a queue to which this message is to be delivered. This is + * called from the exchange when it is routing the message. This will be called before any content bodies have + * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria. + * + * @param queue the queue + */ + public void registerQueue(AMQQueue queue) + { + _destinationQueues.add(queue); + } + + private void deliver() throws AMQException + { + for (AMQQueue q : _destinationQueues) + { + _txnContext.deliver(this, q); + } + } + + public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, String consumerTag) + { + ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, consumerTag); + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); + + Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId); + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + if (bodyFrameIterator.hasNext()) + { + AMQDataBlock firstContentBody = bodyFrameIterator.next(); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + } + else + { + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, + new AMQDataBlock[]{contentHeader}); + protocolSession.writeFrame(compositeBlock); + } + + // + // Now start writing out the other content bodies + // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded + // + while (bodyFrameIterator.hasNext()) + { + protocolSession.writeFrame(bodyFrameIterator.next()); + } + + } + + private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, String consumerTag) + { + AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, consumerTag, + deliveryTag, false, getExchangeName(), + getRoutingKey()); + ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? + deliverFrame.writePayload(buf); + buf.flip(); + return buf; + } + + private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText) + { + AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, replyCode, replyText, _publishBody.exchange, + _publishBody.routingKey); + ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? + returnFrame.writePayload(buf); + buf.flip(); + return buf; + } + + public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, String replyText) + { + ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText); + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); + + Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId); + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + if (bodyFrameIterator.hasNext()) + { + AMQDataBlock firstContentBody = bodyFrameIterator.next(); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + } + else + { + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, + new AMQDataBlock[]{contentHeader}); + protocolSession.writeFrame(compositeBlock); + } + + // + // Now start writing out the other content bodies + // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded + // + while (bodyFrameIterator.hasNext()) + { + protocolSession.writeFrame(bodyFrameIterator.next()); + } + } } diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java new file mode 100644 index 0000000000..62ad28237f --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -0,0 +1,53 @@ +/** + * User: Robert Greig + * Date: 23-Oct-2006 + ****************************************************************************** + * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of + * this program may be photocopied reproduced or translated to another + * program language without prior written consent of JP Morgan Chase Ltd + ******************************************************************************/ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * @author Robert Greig (robert.j.greig@jpmorgan.com) + */ +public interface AMQMessageHandle +{ + ContentHeaderBody getContentHeaderBody(); + + void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException; + + /** + * @return the number of body frames associated with this message + */ + int getBodyCount(); + + /** + * @return the size of the body + */ + long getBodySize(); + + /** + * Get a particular content body + * @param index the index of the body to retrieve, must be between 0 and getBodyCount() - 1 + * @return a content body + * @throws IllegalArgumentException if the index is invalid + */ + ContentBody getContentBody(int index) throws IllegalArgumentException; + + void addContentBodyFrame(ContentBody contentBody) throws AMQException; + + String getExchangeName(); + + String getRoutingKey(); + + boolean isImmediate(); + + boolean isRedelivered(); + + boolean isPersistent() throws AMQException; +}
\ No newline at end of file diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java index 7fb6ddcb5c..914dafe6e0 100644 --- a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java @@ -21,24 +21,15 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.management.Managable; -import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.management.*; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.txn.TxnBuffer; -import org.apache.qpid.server.txn.TxnOp; - -import javax.management.JMException; -import javax.management.MBeanException; -import javax.management.MBeanNotificationInfo; -import javax.management.NotCompliantMBeanException; -import javax.management.Notification; + +import javax.management.*; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.*; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.Executor; @@ -284,7 +275,7 @@ public class AMQQueue implements Managable { queueSize = queueSize + getMessageSize(message); } - return new Long(Math.round(queueSize/100)); + return (long) Math.round(queueSize/100); } // Operations @@ -292,16 +283,11 @@ public class AMQQueue implements Managable private long getMessageSize(AMQMessage msg) { if (msg == null) - return 0l; - - List<ContentBody> cBodies = msg.getContentBodies(); - long messageSize = 0; - for (ContentBody body : cBodies) { - if (body != null) - messageSize = messageSize + body.getSize(); + return 0L; } - return messageSize; + + return msg.getContentHeaderBody().bodySize; } // Checks if there is any notification to be send to the listeners @@ -386,7 +372,8 @@ public class AMQQueue implements Managable if (beginIndex > list.size()) { - throw new JMException("FromIndex = " + beginIndex + ". There are only " + list.size() + " messages in the queue"); + throw new JMException("FromIndex = " + beginIndex + ". There are only " + list.size() + + " messages in the queue"); } endIndex = endIndex < list.size() ? endIndex : list.size(); @@ -397,14 +384,15 @@ public class AMQQueue implements Managable AMQMessage msg = list.get(i - 1); long msgId = msg.getMessageId(); - List<ContentBody> cBodies = msg.getContentBodies(); + Iterator<ContentBody> cBodies = msg.getContentBodyIterator(); TabularDataSupport _contentList = new TabularDataSupport(_contentBodyListType); int contentSerialNo = 1; long size = 0; - for (ContentBody body : cBodies) + while (cBodies.hasNext()) { + ContentBody body = cBodies.next(); if (body.getSize() != 0) { Byte[] byteArray = getByteArray(body.payload.slice().array()); @@ -665,7 +653,7 @@ public class AMQQueue implements Managable delete(); } - public void deliver(AMQMessage msg) throws AMQException + /*public void deliver(AMQMessage msg) throws AMQException { TxnBuffer buffer = msg.getTxnBuffer(); if(buffer == null) @@ -678,15 +666,15 @@ public class AMQQueue implements Managable { buffer.enlist(new Deliver(msg)); } - } + } */ - private void record(AMQMessage msg) throws AMQException + /*private void record(AMQMessage msg) throws AMQException { msg.enqueue(this); msg.incrementReference(); - } + } */ - private void process(AMQMessage msg) throws FailedDequeueException + public void process(AMQMessage msg) throws FailedDequeueException { _deliveryMgr.deliver(getName(), msg); updateReceivedMessageCount(msg); @@ -773,45 +761,4 @@ public class AMQQueue implements Managable _logger.debug(MessageFormat.format(msg, args)); } } - - private class Deliver implements TxnOp - { - private final AMQMessage _msg; - - Deliver(AMQMessage msg) - { - _msg = msg; - } - - public void prepare() throws AMQException - { - //do the persistent part of the record() - _msg.enqueue(AMQQueue.this); - } - - public void undoPrepare() - { - } - - public void commit() - { - //do the memeory part of the record() - _msg.incrementReference(); - //then process the message - try - { - process(_msg); - } - catch(FailedDequeueException e) - { - //TODO: is there anything else we can do here? I think not... - _logger.error("Error during commit of a queue delivery: " + e, e); - } - } - - public void rollback() - { - } - } - } diff --git a/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java b/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java new file mode 100644 index 0000000000..e0e903f21f --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java @@ -0,0 +1,30 @@ +/** + * User: Robert Greig + * Date: 31-Oct-2006 + ****************************************************************************** + * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of + * this program may be photocopied reproduced or translated to another + * program language without prior written consent of JP Morgan Chase Ltd + ******************************************************************************/ +package org.apache.qpid.server.queue; + +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.server.store.MessageStore; + +/** + * Constructs a message handle based on the publish body, the content header and the queue to which the message + * has been routed. + * + * @author Robert Greig (robert.j.greig@jpmorgan.com) + */ +public class MessageHandleFactory +{ + + public AMQMessageHandle createMessageHandle(MessageStore store, BasicPublishBody publish, + ContentHeaderBody contentHeader) + { + // just hardcoded for now + return new WeakReferenceMessageHandle(store, contentHeader); + } +} diff --git a/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java index 277d7e2793..09f01d6d97 100644 --- a/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java +++ b/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java @@ -17,13 +17,8 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.protocol.AMQConstant; - -import java.util.List; +import org.apache.qpid.server.RequiredDeliveryException; /** * Signals that no consumers exist for a message at a given point in time. @@ -32,19 +27,9 @@ import java.util.List; */ public class NoConsumersException extends RequiredDeliveryException { - public NoConsumersException(String queue, - BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, - List<ContentBody> contentBodies) - { - super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies); - } - - public NoConsumersException(BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, - List<ContentBody> contentBodies) + public NoConsumersException(AMQMessage message) { - super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies); + super("Immediate delivery is not possible.", message); } public int getReplyCode() diff --git a/java/broker/src/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/org/apache/qpid/server/queue/Subscription.java index 2c073427da..2d5ad8309f 100644 --- a/java/broker/src/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/org/apache/qpid/server/queue/Subscription.java @@ -25,5 +25,5 @@ public interface Subscription boolean isSuspended(); - void queueDeleted(AMQQueue queue); + void queueDeleted(AMQQueue queue) throws AMQException; } diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java index b0855999a2..ed00a57dde 100644 --- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -18,11 +18,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -127,8 +123,8 @@ public class SubscriptionImpl implements Subscription if (msg != null) { // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. - + // we can decrement the reference count immediately. + // By doing this _before_ the send we ensure that it // doesn't get sent if it can't be dequeued, preventing // duplicate delivery on recovery. @@ -148,10 +144,7 @@ public class SubscriptionImpl implements Subscription channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); - AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); - - protocolSession.writeFrame(frame); + msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag); } } else @@ -170,19 +163,8 @@ public class SubscriptionImpl implements Subscription * * @param queue */ - public void queueDeleted(AMQQueue queue) + public void queueDeleted(AMQQueue queue) throws AMQException { channel.queueDeleted(queue); } - - private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) - { - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag, - deliveryTag, false, exchange, - routingKey); - ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? - deliverFrame.writePayload(buf); - buf.flip(); - return buf; - } } diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java index 3b53792234..fe55bd071b 100644 --- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java @@ -18,6 +18,8 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + import java.util.List; import java.util.ListIterator; import java.util.concurrent.CopyOnWriteArrayList; @@ -166,7 +168,7 @@ class SubscriptionSet implements WeightedSubscriptionManager * channel, which in turn can update its list of unacknowledged messages. * @param queue */ - public void queueDeleted(AMQQueue queue) + public void queueDeleted(AMQQueue queue) throws AMQException { for (Subscription s : _subscriptions) { diff --git a/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java new file mode 100644 index 0000000000..1228773c37 --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -0,0 +1,99 @@ +/** + * User: Robert Greig + * Date: 23-Oct-2006 + ****************************************************************************** + * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of + * this program may be photocopied reproduced or translated to another + * program language without prior written consent of JP Morgan Chase Ltd + ******************************************************************************/ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; + +import java.util.List; + +/** + * @author Robert Greig (robert.j.greig@jpmorgan.com) + */ +public class WeakReferenceMessageHandle implements AMQMessageHandle +{ + private ContentHeaderBody _contentHeaderBody; + + private List<ContentBody> _contentBodies; + + private boolean _redelivered; + + private final MessageStore _messageStore; + + public WeakReferenceMessageHandle(MessageStore messageStore, ContentHeaderBody contentHeader) + { + _messageStore = messageStore; + _contentHeaderBody = contentHeader; + } + + public ContentHeaderBody getContentHeaderBody() + { + return _contentHeaderBody; + } + + public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException + { + _contentHeaderBody = contentHeaderBody; + } + + public int getBodyCount() + { + return _contentBodies.size(); + } + + public long getBodySize() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public ContentBody getContentBody(int index) throws IllegalArgumentException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void addContentBodyFrame(ContentBody contentBody) throws AMQException + { + _contentBodies.add(contentBody); + } + + public String getExchangeName() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public String getRoutingKey() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isImmediate() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isRedelivered() + { + return _redelivered; + } + + public boolean isPersistent() throws AMQException + { + if (_contentHeaderBody == null) + { + throw new AMQException("Cannot determine delivery mode of message. Content header not found."); + } + + //todo remove literal values to a constant file such as AMQConstants in common + return _contentHeaderBody.properties instanceof BasicContentHeaderProperties + && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; + } +} diff --git a/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java b/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java new file mode 100644 index 0000000000..bc0e993f83 --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java @@ -0,0 +1,84 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.RequiredDeliveryException; + +import java.util.List; + +/** + * @author Apache Software Foundation + */ +public class CleanupMessageOperation implements TxnOp +{ + private static final Logger _log = Logger.getLogger(CleanupMessageOperation.class); + + private final AMQMessage _msg; + + private final List<RequiredDeliveryException> _returns; + + public CleanupMessageOperation(AMQMessage msg, List<RequiredDeliveryException> returns) + { + _msg = msg; + _returns = returns; + } + + public void prepare() throws AMQException + { + } + + public void undoPrepare() + { + //don't need to do anything here, if the store's txn failed + //when processing prepare then the message was not stored + //or enqueued on any queues and can be discarded + } + + public void commit() + { + //The routers reference can now be released. This is done + //here to ensure that it happens after the queues that + //enqueue it have incremented their counts (which as a + //memory only operation is done in the commit phase). + try + { + _msg.decrementReference(); + } + catch (AMQException e) + { + _log.error("On commiting transaction, failed to cleanup unused message: " + e, e); + } + try + { + _msg.checkDeliveredToConsumer(); + } + catch (NoConsumersException e) + { + //TODO: store this for delivery after the commit-ok + _returns.add(e); + } + } + + public void rollback() + { + } +} diff --git a/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java b/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java new file mode 100644 index 0000000000..baf2333751 --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java @@ -0,0 +1,63 @@ +/** + * User: Robert Greig + * Date: 01-Nov-2006 + ****************************************************************************** + * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of + * this program may be photocopied reproduced or translated to another + * program language without prior written consent of JP Morgan Chase Ltd + ******************************************************************************/ +package org.apache.qpid.server.txn; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.FailedDequeueException; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.log4j.Logger; + +/** + * @author Robert Greig (robert.j.greig@jpmorgan.com) + */ +public class DeliverMessageOperation implements TxnOp +{ + private static final Logger _logger = Logger.getLogger(DeliverMessageOperation.class); + + private final AMQMessage _msg; + + private final AMQQueue _queue; + + public DeliverMessageOperation(AMQMessage msg, AMQQueue queue) + { + _msg = msg; + _queue = queue; + } + + public void prepare() throws AMQException + { + //do the persistent part of the record() + _msg.enqueue(_queue); + } + + public void undoPrepare() + { + } + + public void commit() + { + //do the memeory part of the record() + _msg.incrementReference(); + //then process the message + try + { + _queue.process(_msg); + } + catch (FailedDequeueException e) + { + //TODO: is there anything else we can do here? I think not... + _logger.error("Error during commit of a queue delivery: " + e, e); + } + } + + public void rollback() + { + } +} diff --git a/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java new file mode 100644 index 0000000000..9d04978b23 --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -0,0 +1,118 @@ +/** + * User: Robert Greig + * Date: 01-Nov-2006 + ****************************************************************************** + * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of + * this program may be photocopied reproduced or translated to another + * program language without prior written consent of JP Morgan Chase Ltd + ******************************************************************************/ +package org.apache.qpid.server.txn; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.ack.TxAck; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.RequiredDeliveryException; + +import java.util.List; + +/** + * @author Robert Greig (robert.j.greig@jpmorgan.com) + */ +public class LocalTransactionalContext implements TransactionalContext +{ + private final TxnBuffer _txnBuffer; + + /** + * We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are + * consolidated into a single operation + */ + private TxAck _ackOp; + + private List<RequiredDeliveryException> _returnMessages; + + public LocalTransactionalContext(TxnBuffer txnBuffer, List<RequiredDeliveryException> returnMessages) + { + _txnBuffer = txnBuffer; + _returnMessages = returnMessages; + } + + public void rollback() throws AMQException + { + _txnBuffer.rollback(); + } + + public void deliver(AMQMessage message, AMQQueue queue) throws AMQException + { + // don't create a transaction unless needed + if (message.isPersistent()) + { + _txnBuffer.containsPersistentChanges(); + } + + // A publication will result in the enlisting of several + // TxnOps. The first is an op that will store the message. + // Following that (and ordering is important), an op will + // be added for every queue onto which the message is + // enqueued. Finally a cleanup op will be added to decrement + // the reference associated with the routing. + _txnBuffer.enlist(new StoreMessageOperation(message)); + _txnBuffer.enlist(new DeliverMessageOperation(message, queue)); + _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages)); + } + + private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException + { + if (!unacknowledgedMessageMap.contains(deliveryTag)) + { + throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel"); + } + } + + public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple, + UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException + { + //check that the tag exists to give early failure + if (!multiple || deliveryTag > 0) + { + checkAck(deliveryTag, unacknowledgedMessageMap); + } + //we use a single txn op for all acks and update this op + //as new acks come in. If this is the first ack in the txn + //we will need to create and enlist the op. + if (_ackOp == null) + { + _ackOp = new TxAck(unacknowledgedMessageMap); + _txnBuffer.enlist(_ackOp); + } + // update the op to include this ack request + if (multiple && deliveryTag == 0) + { + // if have signalled to ack all, that refers only + // to all at this time + _ackOp.update(lastDeliveryTag, multiple); + } + else + { + _ackOp.update(deliveryTag, multiple); + } + } + + public void commit() throws AMQException + { + if (_ackOp != null) + { + _ackOp.consolidate(); + if (_ackOp.checkPersistent()) + { + _txnBuffer.containsPersistentChanges(); + } + //already enlisted, after commit will reset regardless of outcome + _ackOp = null; + } + + _txnBuffer.commit(); + //TODO: may need to return 'immediate' messages at this point + } +} diff --git a/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java new file mode 100644 index 0000000000..381e92de92 --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -0,0 +1,146 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.ack.UnacknowledgedMessage; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.NoConsumersException; + +import java.util.LinkedList; +import java.util.List; + +/** + * @author Apache Software Foundation + */ +public class NonTransactionalContext implements TransactionalContext +{ + private static final Logger _log = Logger.getLogger(NonTransactionalContext.class); + + /** + * Channel is useful for logging + */ + private AMQChannel _channel; + + /** + * Where to put undeliverable messages + */ + private List<RequiredDeliveryException> _returnMessages; + + public NonTransactionalContext(AMQChannel channel, List<RequiredDeliveryException> returnMessages) + { + _channel = channel; + _returnMessages = returnMessages; + } + + public void commit() throws AMQException + { + // Does not apply to this context + } + + public void rollback() throws AMQException + { + // Does not apply to this context + } + + public void deliver(AMQMessage message, AMQQueue queue) throws AMQException + { + try + { + queue.process(message); + //following check implements the functionality + //required by the 'immediate' flag: + message.checkDeliveredToConsumer(); + } + catch (NoConsumersException e) + { + _returnMessages.add(e); + } + finally + { + message.decrementReference(); + } + } + + public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag, + boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap) + throws AMQException + { + if (multiple) + { + if (deliveryTag == 0) + { + + //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, + // tells the server to acknowledge all outstanding mesages. + _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + + unacknowledgedMessageMap.size()); + unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + message.discard(); + return false; + } + + public void visitComplete() + { + unacknowledgedMessageMap.clear(); + } + }); + } + else + { + if (!unacknowledgedMessageMap.contains(deliveryTag)) + { + throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); + } + + LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); + unacknowledgedMessageMap.drainTo(acked, deliveryTag); + for (UnacknowledgedMessage msg : acked) + { + msg.discard(); + } + } + } + else + { + UnacknowledgedMessage msg; + msg = unacknowledgedMessageMap.remove(deliveryTag); + + if (msg == null) + { + _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + + _channel.getChannelId()); + throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + + _channel.getChannelId()); + } + msg.discard(); + if (_log.isDebugEnabled()) + { + _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag); + } + } + } +} diff --git a/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java b/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java new file mode 100644 index 0000000000..277bb66613 --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java @@ -0,0 +1,47 @@ +/** + * User: Robert Greig + * Date: 01-Nov-2006 + ****************************************************************************** + * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of + * this program may be photocopied reproduced or translated to another + * program language without prior written consent of JP Morgan Chase Ltd + ******************************************************************************/ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.AMQException; + +/** + * @author Robert Greig (robert.j.greig@jpmorgan.com) + */ +public class StoreMessageOperation implements TxnOp +{ + //just use this to do a store of the message during the + //prepare phase. Any enqueueing etc is done by TxnOps enlisted + //by the queues themselves. + private final AMQMessage _msg; + + public StoreMessageOperation(AMQMessage msg) + { + _msg = msg; + } + + public void prepare() throws AMQException + { + _msg.storeMessage(); + // the router's reference can now be released + _msg.decrementReference(); + } + + public void undoPrepare() + { + } + + public void commit() + { + } + + public void rollback() + { + } +} diff --git a/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java new file mode 100644 index 0000000000..feaf498ad6 --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java @@ -0,0 +1,29 @@ +/** + * User: Robert Greig + * Date: 01-Nov-2006 + ****************************************************************************** + * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of + * this program may be photocopied reproduced or translated to another + * program language without prior written consent of JP Morgan Chase Ltd + ******************************************************************************/ +package org.apache.qpid.server.txn; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; + +/** + * @author Robert Greig (robert.j.greig@jpmorgan.com) + */ +public interface TransactionalContext +{ + void commit() throws AMQException; + + void rollback() throws AMQException; + + void deliver(AMQMessage message, AMQQueue queue) throws AMQException; + + void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple, + UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException; +} |