diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-07 23:11:53 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-07 23:11:53 +0000 |
commit | 26d0286ef84e00fd27206b5e23aff5c54309a975 (patch) | |
tree | b3ffd1ef57cbbc31faaf95466f91418421d44032 /java/broker/src/main | |
parent | 189816d88cc72f1053a7e7685b18883669c53d57 (diff) | |
download | qpid-python-26d0286ef84e00fd27206b5e23aff5c54309a975.tar.gz |
QPID-32: new model for holding and processing message in memory to support new persistent stores
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@493872 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
58 files changed, 2562 insertions, 1182 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index c5b45659cf..999eb9f651 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -22,32 +22,27 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.exchange.MessageRouter; +import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.LocalTransactionalContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.TxnBuffer; -import org.apache.qpid.server.txn.TxnOp; - -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.Set; -import java.util.HashSet; + +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -59,7 +54,7 @@ public class AMQChannel private final int _channelId; - private boolean _transactional; + //private boolean _transactional; private long _prefetch_HighWaterMark; @@ -97,21 +92,24 @@ public class AMQChannel private final MessageStore _messageStore; - private final Object _unacknowledgedMessageMapLock = new Object(); - - private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); - - private long _lastDeliveryTag; + private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); private final AtomicBoolean _suspended = new AtomicBoolean(false); private final MessageRouter _exchanges; - private final TxnBuffer _txnBuffer; + private TransactionalContext _txnContext; + + /** + * A context used by the message store enabling it to track context for a given channel even across + * thread boundaries + */ + private final StoreContext _storeContext = new StoreContext(); + + private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>(); - private TxAck ackOp; + private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory(); - private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>(); private Set<Long> _browsedAcks = new HashSet<Long>(); public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) @@ -122,22 +120,29 @@ public class AMQChannel _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; _exchanges = exchanges; - _txnBuffer = new TxnBuffer(_messageStore); + // by default the session is non-transactional + _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } - public int getChannelId() + /** + * Sets this channel to be part of a local transaction + */ + public void setLocalTransactional() { - return _channelId; + _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, new TxnBuffer(), _returnMessages); } public boolean isTransactional() { - return _transactional; + // this does not look great but there should only be one "non-transactional" + // transactional context, while there could be several transactional ones in + // theory + return !(_txnContext instanceof NonTransactionalContext); } - public void setTransactional(boolean transactional) + public int getChannelId() { - _transactional = transactional; + return _channelId; } public long getPrefetchCount() @@ -173,7 +178,9 @@ public class AMQChannel public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException { - _currentMessage = new AMQMessage(_messageStore, publishBody); + _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), publishBody, + _txnContext); + // TODO: used in clustering only I think (RG) _currentMessage.setPublisher(publisher); } @@ -182,92 +189,60 @@ public class AMQChannel { if (_currentMessage == null) { - throw new AMQException("Received content header without previously receiving a BasicDeliver frame"); + throw new AMQException("Received content header without previously receiving a BasicPublish frame"); } else { _currentMessage.setContentHeaderBody(contentHeaderBody); - // check and route if header says body length is zero + routeCurrentMessage(); + _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory); + + // check and deliver if header says body length is zero if (contentHeaderBody.bodySize == 0) { - routeCurrentMessage(); + _currentMessage = null; } } } - public void publishContentBody(ContentBody contentBody) + public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) throws AMQException { if (_currentMessage == null) { throw new AMQException("Received content body without previously receiving a JmsPublishBody"); } - if (_currentMessage.getContentHeaderBody() == null) + + // returns true iff the message was delivered (i.e. if all data was + // received + try { - throw new AMQException("Received content body without previously receiving a content header"); + if (_currentMessage.addContentBodyFrame(_storeContext, contentBody)) + { + // callback to allow the context to do any post message processing + // primary use is to allow message return processing in the non-tx case + _txnContext.messageProcessed(protocolSession); + _currentMessage = null; + } } - - _currentMessage.addContentBodyFrame(contentBody); - if (_currentMessage.isAllContentReceived()) + catch (AMQException e) { - routeCurrentMessage(); + // we want to make sure we don't keep a reference to the message in the + // event of an error + _currentMessage = null; + throw e; } } protected void routeCurrentMessage() throws AMQException { - if (_transactional) + try { - //don't create a transaction unless needed - if (_currentMessage.isPersistent()) - { - _txnBuffer.containsPersistentChanges(); - } - - //A publication will result in the enlisting of several - //TxnOps. The first is an op that will store the message. - //Following that (and ordering is important), an op will - //be added for every queue onto which the message is - //enqueued. Finally a cleanup op will be added to decrement - //the reference associated with the routing. - Store storeOp = new Store(_currentMessage); - _txnBuffer.enlist(storeOp); - _currentMessage.setTxnBuffer(_txnBuffer); - try - { - _exchanges.routeContent(_currentMessage); - _txnBuffer.enlist(new Cleanup(_currentMessage)); - } - catch (RequiredDeliveryException e) - { - //Can only be due to the mandatory flag, as no attempt - //has yet been made to deliver the message. The - //message will thus not have been delivered to any - //queue so we can return the message (without killing - //the transaction) and for efficiency remove the store - //operation from the buffer. - _txnBuffer.cancel(storeOp); - throw e; - } - finally - { - _currentMessage = null; - } + _exchanges.routeContent(_currentMessage); } - else + catch (NoRouteException e) { - try - { - _exchanges.routeContent(_currentMessage); - //following check implements the functionality - //required by the 'immediate' flag: - _currentMessage.checkDeliveredToConsumer(); - } - finally - { - _currentMessage.decrementReference(); - _currentMessage = null; - } + _returnMessages.add(e); } } @@ -329,13 +304,7 @@ public class AMQChannel */ public void close(AMQProtocolSession session) throws AMQException { - if (_transactional) - { - synchronized(_txnBuffer) - { - _txnBuffer.rollback();//releases messages - } - } + _txnContext.rollback(); unsubscribeAllConsumers(session); requeue(); } @@ -353,41 +322,32 @@ public class AMQChannel /** * Add a message to the channel-based list of unacknowledged messages * - * @param message - * @param deliveryTag - * @param queue + * @param message the message that was delivered + * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of + * the delivery tag) + * @param queue the queue from which the message was delivered */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue) { - synchronized(_unacknowledgedMessageMapLock) - { - _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); - _lastDeliveryTag = deliveryTag; - checkSuspension(); - } + _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); + checkSuspension(); } /** * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. * May result in delivery to this same channel or to other subscribers. + * @throws org.apache.qpid.AMQException if the requeue fails */ public void requeue() throws AMQException { // we must create a new map since all the messages will get a new delivery tag when they are redelivered - Map<Long, UnacknowledgedMessage> currentList; - synchronized(_unacknowledgedMessageMapLock) - { - currentList = _unacknowledgedMessageMap; - _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); - } + Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); - for (UnacknowledgedMessage unacked : currentList.values()) + for (UnacknowledgedMessage unacked : messagesToBeDelivered) { if (unacked.queue != null) { - unacked.message.setTxnBuffer(null); - - unacked.queue.deliver(unacked.message); + _txnContext.deliver(unacked.message, unacked.queue); } } } @@ -395,53 +355,62 @@ public class AMQChannel /** * Called to resend all outstanding unacknowledged messages to this same channel. */ - public void resend(AMQProtocolSession session) + public void resend(final AMQProtocolSession session) throws AMQException { - //messages go to this channel - synchronized(_unacknowledgedMessageMapLock) + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet()) + public boolean callback(UnacknowledgedMessage message) throws AMQException { - long deliveryTag = entry.getKey(); - String consumerTag = entry.getValue().consumerTag; - AMQMessage msg = entry.getValue().message; + long deliveryTag = message.deliveryTag; + String consumerTag = message.consumerTag; + AMQMessage msg = message.message; msg.setRedelivered(true); - session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag)); + msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); + // false means continue processing + return false; } - } + + public void visitComplete() + { + } + }); } /** * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged * messages to remove the queue reference and also decrement any message reference counts, without - * actually removing the item sine we may get an ack for a delivery tag that was generated from the + * actually removing the item since we may get an ack for a delivery tag that was generated from the * deleted queue. * - * @param queue + * @param queue the queue that has been deleted + * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages */ - public void queueDeleted(AMQQueue queue) + public void queueDeleted(final AMQQueue queue) throws AMQException { - synchronized(_unacknowledgedMessageMapLock) + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet()) + public boolean callback(UnacknowledgedMessage message) throws AMQException { - final UnacknowledgedMessage unackedMsg = unacked.getValue(); - // we can compare the reference safely in this case - if (unackedMsg.queue == queue) + if (message.queue == queue) { - unackedMsg.queue = null; try { - unackedMsg.message.decrementReference(); + message.discard(_storeContext); + message.queue = null; } catch (AMQException e) { - _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " + + _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e); } } + return false; } - } + + public void visitComplete() + { + } + }); } /** @@ -454,150 +423,7 @@ public class AMQChannel */ public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException { - if (_transactional) - { - //check that the tag exists to give early failure - if (!multiple || deliveryTag > 0) - { - checkAck(deliveryTag); - } - //we use a single txn op for all acks and update this op - //as new acks come in. If this is the first ack in the txn - //we will need to create and enlist the op. - if (ackOp == null) - { - ackOp = new TxAck(new AckMap()); - _txnBuffer.enlist(ackOp); - } - //update the op to include this ack request - if (multiple && deliveryTag == 0) - { - synchronized(_unacknowledgedMessageMapLock) - { - //if have signalled to ack all, that refers only - //to all at this time - ackOp.update(_lastDeliveryTag, multiple); - } - } - else - { - ackOp.update(deliveryTag, multiple); - } - } - else - { - handleAcknowledgement(deliveryTag, multiple); - } - } - - private void checkAck(long deliveryTag) throws AMQException - { - synchronized(_unacknowledgedMessageMapLock) - { - if (!_unacknowledgedMessageMap.containsKey(deliveryTag)) - { - throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel"); - } - } - } - - private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException - { - if (_log.isDebugEnabled()) - { - _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag + - " and multiple " + multiple); - } - if (multiple) - { - LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); - synchronized(_unacknowledgedMessageMapLock) - { - if (deliveryTag == 0) - { - //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages. - _log.trace("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size()); - acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values()); - _unacknowledgedMessageMap.clear(); - } - else - { - if (!_unacknowledgedMessageMap.containsKey(deliveryTag)) - { - throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); - } - Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator(); - - while (i.hasNext()) - { - - Map.Entry<Long, UnacknowledgedMessage> unacked = i.next(); - - if (unacked.getKey() > deliveryTag) - { - //This should not occur now. - throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString()); - } - - i.remove(); - - acked.add(unacked.getValue()); - if (unacked.getKey() == deliveryTag) - { - break; - } - } - } - }// synchronized - - if (_log.isTraceEnabled()) - { - _log.trace("Received multiple ack for delivery tag " + deliveryTag + ". Removing " + - acked.size() + " items."); - } - - for (UnacknowledgedMessage msg : acked) - { - if (!_browsedAcks.contains(deliveryTag)) - { - msg.discard(); - } - else - { - _browsedAcks.remove(deliveryTag); - } - } - - } - else - { - UnacknowledgedMessage msg; - synchronized(_unacknowledgedMessageMapLock) - { - msg = _unacknowledgedMessageMap.remove(deliveryTag); - } - - if (msg == null) - { - _log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); - throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); - } - - if (!_browsedAcks.contains(deliveryTag)) - { - msg.discard(); - } - else - { - _browsedAcks.remove(deliveryTag); - } - - if (_log.isTraceEnabled()) - { - _log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag); - } - } - + _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext); checkSuspension(); } @@ -606,19 +432,22 @@ public class AMQChannel * * @return the map of unacknowledged messages */ - public Map<Long, UnacknowledgedMessage> getUnacknowledgedMessageMap() + public UnacknowledgedMessageMap getUnacknowledgedMessageMap() { return _unacknowledgedMessageMap; } + public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue) + { + _browsedAcks.add(deliveryTag); + addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + private void checkSuspension() { boolean suspend; - //noinspection SynchronizeOnNonFinalField - synchronized(_unacknowledgedMessageMapLock) - { - suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark; - } + suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark; + setSuspended(suspend); } @@ -628,11 +457,8 @@ public class AMQChannel if (isSuspended && !suspended) { - synchronized(_unacknowledgedMessageMapLock) - { - // Continue being suspended if we are above the _prefetch_LowWaterMark - suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark; - } + // Continue being suspended if we are above the _prefetch_LowWaterMark + suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark; } boolean wasSuspended = _suspended.getAndSet(suspended); @@ -661,33 +487,18 @@ public class AMQChannel public void commit() throws AMQException { - if (ackOp != null) - { - ackOp.consolidate(); - if (ackOp.checkPersistent()) - { - _txnBuffer.containsPersistentChanges(); - } - ackOp = null;//already enlisted, after commit will reset regardless of outcome - } - - _txnBuffer.commit(); - //TODO: may need to return 'immediate' messages at this point + _txnContext.commit(); } public void rollback() throws AMQException { - //need to protect rollback and close from each other... - synchronized(_txnBuffer) - { - _txnBuffer.rollback(); - } + _txnContext.rollback(); } public String toString() { StringBuilder sb = new StringBuilder(30); - sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional); + sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional()); sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark); sb.append("/").append(_prefetch_HighWaterMark); return sb.toString(); @@ -703,121 +514,18 @@ public class AMQChannel return _defaultQueue; } - public void processReturns(AMQProtocolSession session) + public StoreContext getStoreContext() { - for (AMQDataBlock block : _returns) - { - session.writeFrame(block); - } - _returns.clear(); - } - - public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue) - { - _browsedAcks.add(deliveryTag); - addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + return _storeContext; } - //we use this wrapper to ensure we are always using the correct - //map instance (its not final unfortunately) - private class AckMap implements UnacknowledgedMessageMap + public void processReturns(AMQProtocolSession session) throws AMQException { - public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs) - { - impl().collect(deliveryTag, multiple, msgs); - } - - public void remove(List<UnacknowledgedMessage> msgs) - { - impl().remove(msgs); - } - - private UnacknowledgedMessageMap impl() - { - return new UnacknowledgedMessageMapImpl(_unacknowledgedMessageMapLock, _unacknowledgedMessageMap); - } - } - - private class Store implements TxnOp - { - //just use this to do a store of the message during the - //prepare phase. Any enqueueing etc is done by TxnOps enlisted - //by the queues themselves. - private final AMQMessage _msg; - - Store(AMQMessage msg) - { - _msg = msg; - } - - public void prepare() throws AMQException - { - _msg.storeMessage(); - //the routers reference can now be released - _msg.decrementReference(); - } - - public void undoPrepare() - { - } - - public void commit() - { - } - - public void rollback() + for (RequiredDeliveryException bouncedMessage : _returnMessages) { + AMQMessage message = bouncedMessage.getAMQMessage(); + message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), bouncedMessage.getMessage()); } + _returnMessages.clear(); } - - private class Cleanup implements TxnOp - { - private final AMQMessage _msg; - - Cleanup(AMQMessage msg) - { - _msg = msg; - } - - public void prepare() throws AMQException - { - } - - public void undoPrepare() - { - //don't need to do anything here, if the store's txn failed - //when processing prepare then the message was not stored - //or enqueued on any queues and can be discarded - } - - public void commit() - { - //The routers reference can now be released. This is done - //here to ensure that it happens after the queues that - //enqueue it have incremented their counts (which as a - //memory only operation is done in the commit phase). - try - { - _msg.decrementReference(); - } - catch (AMQException e) - { - _log.error("On commiting transaction, failed to cleanup unused message: " + e, e); - } - try - { - _msg.checkDeliveredToConsumer(); - } - catch (NoConsumersException e) - { - //TODO: store this for delivery after the commit-ok - _returns.add(e.getReturnMessage(_channelId)); - } - } - - public void rollback() - { - } - } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 5e463646f9..b85e3603b7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,17 +20,9 @@ */ package org.apache.qpid.server; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.CompositeAMQDataBlock; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; -import java.util.List; - /** * Signals that a required delivery could not be made. This could be bacuse of * the immediate flag being set and the queue having no consumers, or the mandatory @@ -38,79 +30,24 @@ import java.util.List; */ public abstract class RequiredDeliveryException extends AMQException { - private final String _message; - private final BasicPublishBody _publishBody; - private final ContentHeaderBody _contentHeaderBody; - private final List<ContentBody> _contentBodies; + private final AMQMessage _amqMessage; public RequiredDeliveryException(String message, AMQMessage payload) { super(message); - _message = message; - _publishBody = payload.getPublishBody(); - _contentHeaderBody = payload.getContentHeaderBody(); - _contentBodies = payload.getContentBodies(); + _amqMessage = payload; + payload.incrementReference(); } - public RequiredDeliveryException(String message, - BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, - List<ContentBody> contentBodies) + public AMQMessage getAMQMessage() { - super(message); - _message = message; - _publishBody = publishBody; - _contentHeaderBody = contentHeaderBody; - _contentBodies = contentBodies; - } - - public BasicPublishBody getPublishBody() - { - return _publishBody; - } - - public ContentHeaderBody getContentHeaderBody() - { - return _contentHeaderBody; - } - - public List<ContentBody> getContentBodies() - { - return _contentBodies; - } - - public CompositeAMQDataBlock getReturnMessage(int channel) - { - // AMQP version change: All generated *Body classes are now version-aware. - // Shortcut: hardwire version to 0-8 (major=8, minor=0) for now. - // TODO: Connect the version to that returned by the ProtocolInitiation - // for this session. - BasicReturnBody returnBody = new BasicReturnBody((byte)8, (byte)0); - returnBody.exchange = _publishBody.exchange; - returnBody.replyCode = getReplyCode(); - returnBody.replyText = _message; - returnBody.routingKey = _publishBody.routingKey; - - AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()]; - - AMQFrame returnFrame = new AMQFrame(); - returnFrame.bodyFrame = returnBody; - returnFrame.channel = channel; - - allFrames[0] = returnFrame; - allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); - for (int i = 2; i < allFrames.length; i++) - { - allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2)); - } - - return new CompositeAMQDataBlock(allFrames); + return _amqMessage; } public int getErrorCode() { return getReplyCode(); - } + } public abstract int getReplyCode(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index a204e176aa..c0a631080e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,13 +22,14 @@ package org.apache.qpid.server.ack; import org.apache.qpid.AMQException; import org.apache.qpid.server.txn.TxnOp; +import org.apache.qpid.server.store.StoreContext; import java.util.LinkedList; import java.util.List; /** * A TxnOp implementation for handling accumulated acks - */ + */ public class TxAck implements TxnOp { private final UnacknowledgedMessageMap _map; @@ -44,14 +45,14 @@ public class TxAck implements TxnOp public void update(long deliveryTag, boolean multiple) { - if(!multiple) + if (!multiple) { //have acked a single message that is not part of //the previously acked region so record //individually _individual.add(deliveryTag);//_multiple && !multiple } - else if(deliveryTag > _deliveryTag) + else if (deliveryTag > _deliveryTag) { //have simply moved the last acked message on a //bit @@ -63,7 +64,7 @@ public class TxAck implements TxnOp public void consolidate() { //lookup all the unacked messages that have been acked in this transaction - if(_multiple) + if (_multiple) { //get all the unacked messages for the accumulated //multiple acks @@ -71,22 +72,22 @@ public class TxAck implements TxnOp } //get any unacked messages for individual acks outside the //range covered by multiple acks - for(long tag : _individual) + for (long tag : _individual) { if(_deliveryTag < tag) { - _map.collect(tag, false, _unacked); + _map.collect(tag, false, _unacked); } } } public boolean checkPersistent() throws AMQException - { + { //if any of the messages in unacked are persistent the txn //buffer must be marked as persistent: - for(UnacknowledgedMessage msg : _unacked) + for (UnacknowledgedMessage msg : _unacked) { - if(msg.message.isPersistent()) + if (msg.message.isPersistent()) { return true; } @@ -94,34 +95,34 @@ public class TxAck implements TxnOp return false; } - public void prepare() throws AMQException + public void prepare(StoreContext storeContext) throws AMQException { //make persistent changes, i.e. dequeue and decrementReference - for(UnacknowledgedMessage msg : _unacked) + for (UnacknowledgedMessage msg : _unacked) { - msg.discard(); + msg.discard(storeContext); } } - + public void undoPrepare() { //decrementReference is annoyingly untransactional (due to //in memory counter) so if we failed in prepare for full //txn, this op will have to compensate by fixing the count - //in memory (persistent changes will be rolled back by store) - for(UnacknowledgedMessage msg : _unacked) + //in memory (persistent changes will be rolled back by store) + for (UnacknowledgedMessage msg : _unacked) { msg.message.incrementReference(); - } + } } - public void commit() + public void commit(StoreContext storeContext) { //remove the unacked messages from the channels map _map.remove(_unacked); } - public void rollback() + public void rollback(StoreContext storeContext) { } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index 0eff5a3cca..26f41e19af 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,7 @@ package org.apache.qpid.server.ack; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.StoreContext; public class UnacknowledgedMessage { @@ -30,7 +31,7 @@ public class UnacknowledgedMessage public final String consumerTag; public final long deliveryTag; public AMQQueue queue; - + public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag, long deliveryTag) { this.queue = queue; @@ -39,13 +40,13 @@ public class UnacknowledgedMessage this.deliveryTag = deliveryTag; } - public void discard() throws AMQException + public void discard(StoreContext storeContext) throws AMQException { if (queue != null) { - message.dequeue(queue); + message.dequeue(storeContext, queue); } - message.decrementReference(); + message.decrementReference(storeContext); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java index b0bbe224e3..b7a75d5b71 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,11 +20,55 @@ */ package org.apache.qpid.server.ack; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.txn.TransactionalContext; + +import java.util.Collection; import java.util.List; +import java.util.Set; public interface UnacknowledgedMessageMap { - public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs); - public void remove(List<UnacknowledgedMessage> msgs); + public interface Visitor + { + /** + * @param message the message being iterated over + * @return true to stop iteration, false to continue + * @throws AMQException + */ + boolean callback(UnacknowledgedMessage message) throws AMQException; + + void visitComplete(); + } + + void visit(Visitor visitor) throws AMQException; + + void add(long deliveryTag, UnacknowledgedMessage message); + + void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs); + + boolean contains(long deliveryTag) throws AMQException; + + void remove(List<UnacknowledgedMessage> msgs); + + UnacknowledgedMessage remove(long deliveryTag); + + void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException; + + Collection<UnacknowledgedMessage> cancelAllMessages(); + + void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException; + + int size(); + + void clear(); + + UnacknowledgedMessage get(long deliveryTag); + + /** + * Get the set of delivery tags that are outstanding. + * @return a set of delivery tags + */ + Set<Long> getDeliveryTags(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index eda3233e56..1f4333549a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,19 +20,34 @@ */ package org.apache.qpid.server.ack; -import java.util.List; -import java.util.Map; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.AMQException; + +import java.util.*; public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { - private final Object _lock; + private final Object _lock = new Object(); + private Map<Long, UnacknowledgedMessage> _map; - public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map) + private long _lastDeliveryTag; + + private final int _prefetchLimit; + + public UnacknowledgedMessageMapImpl(int prefetchLimit) + { + _prefetchLimit = prefetchLimit; + _map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit); + } + + /*public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map) { _lock = lock; _map = map; - } + } */ public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs) { @@ -47,28 +62,151 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } + public boolean contains(long deliveryTag) throws AMQException + { + synchronized (_lock) + { + return _map.containsKey(deliveryTag); + } + } + public void remove(List<UnacknowledgedMessage> msgs) { - synchronized(_lock) + synchronized (_lock) { for(UnacknowledgedMessage msg : msgs) { _map.remove(msg.deliveryTag); - } + } + } + } + + public UnacknowledgedMessage remove(long deliveryTag) + { + synchronized (_lock) + { + return _map.remove(deliveryTag); + } + } + + public void visit(Visitor visitor) throws AMQException + { + synchronized (_lock) + { + Collection<UnacknowledgedMessage> currentEntries = _map.values(); + for (UnacknowledgedMessage msg: currentEntries) + { + visitor.callback(msg); + } + visitor.visitComplete(); + } + } + + public void add(long deliveryTag, UnacknowledgedMessage message) + { + synchronized( _lock) + { + _map.put(deliveryTag, message); + _lastDeliveryTag = deliveryTag; + } + } + + public Collection<UnacknowledgedMessage> cancelAllMessages() + { + synchronized (_lock) + { + Collection<UnacknowledgedMessage> currentEntries = _map.values(); + _map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit); + return currentEntries; + } + } + + public void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) + throws AMQException + { + synchronized (_lock) + { + txnContext.acknowledgeMessage(deliveryTag, _lastDeliveryTag, multiple, this); + } + } + + public int size() + { + synchronized (_lock) + { + return _map.size(); + } + } + + public void clear() + { + synchronized (_lock) + { + _map.clear(); + } + } + + public void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException + { + synchronized (_lock) + { + Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = _map.entrySet().iterator(); + while (it.hasNext()) + { + Map.Entry<Long, UnacknowledgedMessage> unacked = it.next(); + + if (unacked.getKey() > deliveryTag) + { + //This should not occur now. + throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + + " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString()); + } + + it.remove(); + + destination.add(unacked.getValue()); + if (unacked.getKey() == deliveryTag) + { + break; + } + } + } + } + + public void resendMessages(AMQProtocolSession protocolSession, int channelId) throws AMQException + { + synchronized (_lock) + { + for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) + { + long deliveryTag = entry.getKey(); + String consumerTag = entry.getValue().consumerTag; + AMQMessage msg = entry.getValue().message; + + msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag); + } } } - private UnacknowledgedMessage get(long key) + public UnacknowledgedMessage get(long key) { - synchronized(_lock) + synchronized (_lock) { return _map.get(key); } } + public Set<Long> getDeliveryTags() + { + synchronized (_lock) + { + return _map.keySet(); + } + } + private void collect(long key, List<UnacknowledgedMessage> msgs) { - synchronized(_lock) + synchronized (_lock) { for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) { @@ -76,9 +214,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap if (entry.getKey() == key) { break; - } + } } } } } - diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index eb9d1acb59..99c08ad200 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.server.exchange; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.protocol.ExchangeInitialiser; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.log4j.Logger; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -84,8 +84,9 @@ public class DefaultExchangeRegistry implements ExchangeRegistry final String exchange = payload.getPublishBody().exchange; final Exchange exch = _exchangeMap.get(exchange); // there is a small window of opportunity for the exchange to be deleted in between - // the JmsPublish being received (where the exchange is validated) and the final + // the BasicPublish being received (where the exchange is validated) and the final // content body being received (which triggers this method) + // TODO: check where the exchange is validated if (exch == null) { throw new AMQException("Exchange '" + exchange + "' does not exist"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index d4069fa315..7b28161263 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -168,8 +168,7 @@ public class DestNameExchange extends AbstractExchange public void route(AMQMessage payload) throws AMQException { - BasicPublishBody publishBody = payload.getPublishBody(); - + final BasicPublishBody publishBody = payload.getPublishBody(); final String routingKey = publishBody.routingKey; final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); if (queues == null || queues.isEmpty()) @@ -193,7 +192,7 @@ public class DestNameExchange extends AbstractExchange for (AMQQueue q : queues) { - q.deliver(payload); + payload.enqueue(q); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 139307488e..c341f30ab6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -171,7 +171,7 @@ public class DestWildExchange extends AbstractExchange // TODO: modify code generator to add clone() method then clone the deliver body // without this addition we have a race condition - we will be modifying the body // before the encoder has encoded the body for delivery - q.deliver(payload); + payload.enqueue(q); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 824e85dc5c..8ef5f0ab29 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; public interface Exchange { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 229502d2a6..dcb64e2d30 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -145,7 +145,7 @@ public class HeadersExchange extends AbstractExchange * <attributename>=<value>,<attributename>=<value>,... * @param queueName * @param binding - * @throws JMException + * @throws javax.management.JMException */ public void createNewBinding(String queueName, String binding) throws JMException { @@ -192,7 +192,7 @@ public class HeadersExchange extends AbstractExchange { _logger.debug("Exchange " + getName() + ": routing message with headers " + headers); } - boolean delivered = false; + boolean routed = false; for (Registration e : _bindings) { if (e.binding.matches(headers)) @@ -202,11 +202,11 @@ public class HeadersExchange extends AbstractExchange _logger.debug("Exchange " + getName() + ": delivering message with headers " + headers + " to " + e.queue.getName()); } - e.queue.deliver(payload); - delivered = true; + payload.enqueue(e.queue); + routed = true; } } - if (!delivered) + if (!routed) { String msg = "Exchange " + getName() + ": message not routable."; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java index 70b80f65da..7508e80f7f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.server.exchange; -import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; /** * Separated out from the ExchangeRegistry interface to allow components @@ -33,6 +33,7 @@ public interface MessageRouter /** * Routes content through exchanges, delivering it to 1 or more queues. * @param message the message to be routed + * * @throws org.apache.qpid.AMQException if something goes wrong delivering data */ void routeContent(AMQMessage message) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java index 0aa5739c1c..c536f77dde 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.filter; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.AMQException; import javax.jms.JMSException; @@ -58,7 +59,8 @@ public abstract class ArithmeticExpression extends BinaryExpression { throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue); } - public String getExpressionSymbol() { + public String getExpressionSymbol() + { return "+"; } }; @@ -193,7 +195,8 @@ public abstract class ArithmeticExpression extends BinaryExpression { } } - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException + { Object lvalue = left.evaluate(message); if (lvalue == null) { return null; diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java index b66de3fbc5..de71e95049 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.filter; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; import javax.jms.JMSException; @@ -33,13 +34,14 @@ import javax.jms.JMSException; * * @version $Revision$ */ -public interface BooleanExpression extends Expression { - +public interface BooleanExpression extends Expression +{ + /** * @param message * @return true if the expression evaluates to Boolean.TRUE. * @throws JMSException */ - public boolean matches(AMQMessage message) throws JMSException; + public boolean matches(AMQMessage message) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java index 13d278cf65..07391098ce 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.filter; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; import java.util.HashSet; import java.util.List; @@ -123,7 +124,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B /** * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext) */ - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException { Object rv = this.getRight().evaluate(message); @@ -139,7 +140,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE; } - public boolean matches(AMQMessage message) throws JMSException { + public boolean matches(AMQMessage message) throws AMQException { Object object = evaluate(message); return object!=null && object==Boolean.TRUE; } @@ -199,7 +200,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B private static BooleanExpression doCreateEqual(Expression left, Expression right) { return new ComparisonExpression(left, right) { - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException { Object lv = left.evaluate(message); Object rv = right.evaluate(message); @@ -340,7 +341,8 @@ public abstract class ComparisonExpression extends BinaryExpression implements B super(left, right); } - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException + { Comparable lv = (Comparable) left.evaluate(message); if (lv == null) { return null; @@ -457,7 +459,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B protected abstract boolean asBoolean(int answer); - public boolean matches(AMQMessage message) throws JMSException { + public boolean matches(AMQMessage message) throws AMQException { Object object = evaluate(message); return object!=null && object==Boolean.TRUE; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java index 9bde712da2..2cd305d4b1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.filter; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; import java.math.BigDecimal; @@ -29,19 +30,24 @@ import javax.jms.JMSException; /** * Represents a constant expression - * + * * @version $Revision$ */ -public class ConstantExpression implements Expression { +public class ConstantExpression implements Expression +{ - static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression { - public BooleanConstantExpression(Object value) { + static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression + { + public BooleanConstantExpression(Object value) + { super(value); } - public boolean matches(AMQMessage message) throws JMSException { + + public boolean matches(AMQMessage message) throws AMQException + { Object object = evaluate(message); - return object!=null && object==Boolean.TRUE; - } + return object != null && object == Boolean.TRUE; + } } public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null); @@ -50,73 +56,92 @@ public class ConstantExpression implements Expression { private Object value; - public static ConstantExpression createFromDecimal(String text) { - - // Strip off the 'l' or 'L' if needed. - if( text.endsWith("l") || text.endsWith("L") ) - text = text.substring(0, text.length()-1); - - Number value; - try { - value = new Long(text); - } catch ( NumberFormatException e) { - // The number may be too big to fit in a long. - value = new BigDecimal(text); - } - + public static ConstantExpression createFromDecimal(String text) + { + + // Strip off the 'l' or 'L' if needed. + if (text.endsWith("l") || text.endsWith("L")) + { + text = text.substring(0, text.length() - 1); + } + + Number value; + try + { + value = new Long(text); + } + catch (NumberFormatException e) + { + // The number may be too big to fit in a long. + value = new BigDecimal(text); + } + long l = value.longValue(); - if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) { + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) + { value = new Integer(value.intValue()); } return new ConstantExpression(value); } - public static ConstantExpression createFromHex(String text) { + public static ConstantExpression createFromHex(String text) + { Number value = new Long(Long.parseLong(text.substring(2), 16)); long l = value.longValue(); - if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) { + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) + { value = new Integer(value.intValue()); } return new ConstantExpression(value); } - public static ConstantExpression createFromOctal(String text) { + public static ConstantExpression createFromOctal(String text) + { Number value = new Long(Long.parseLong(text, 8)); long l = value.longValue(); - if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) { + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) + { value = new Integer(value.intValue()); } return new ConstantExpression(value); } - public static ConstantExpression createFloat(String text) { + public static ConstantExpression createFloat(String text) + { Number value = new Double(text); return new ConstantExpression(value); } - public ConstantExpression(Object value) { + public ConstantExpression(Object value) + { this.value = value; } - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException + { return value; } - public Object getValue() { + public Object getValue() + { return value; - } + } /** * @see java.lang.Object#toString() */ - public String toString() { - if (value == null) { + public String toString() + { + if (value == null) + { return "NULL"; } - if (value instanceof Boolean) { + if (value instanceof Boolean) + { return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE"; } - if (value instanceof String) { + if (value instanceof String) + { return encodeString((String) value); } return value.toString(); @@ -127,7 +152,8 @@ public class ConstantExpression implements Expression { * * @see java.lang.Object#hashCode() */ - public int hashCode() { + public int hashCode() + { return toString().hashCode(); } @@ -136,9 +162,11 @@ public class ConstantExpression implements Expression { * * @see java.lang.Object#equals(java.lang.Object) */ - public boolean equals(Object o) { + public boolean equals(Object o) + { - if (o == null || !this.getClass().equals(o.getClass())) { + if (o == null || !this.getClass().equals(o.getClass())) + { return false; } return toString().equals(o.toString()); @@ -153,12 +181,15 @@ public class ConstantExpression implements Expression { * @param s * @return */ - public static String encodeString(String s) { + public static String encodeString(String s) + { StringBuffer b = new StringBuffer(); b.append('\''); - for (int i = 0; i < s.length(); i++) { + for (int i = 0; i < s.length(); i++) + { char c = s.charAt(i); - if (c == '\'') { + if (c == '\'') + { b.append(c); } b.append(c); @@ -166,5 +197,5 @@ public class ConstantExpression implements Expression { b.append('\''); return b.toString(); } - + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java index a15c15fb91..3b5debd3ee 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.filter; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; import javax.jms.JMSException; @@ -32,11 +33,12 @@ import javax.jms.JMSException; * * @version $Revision$ */ -public interface Expression { +public interface Expression +{ /** * @return the value of this expression */ - public Object evaluate(AMQMessage message) throws JMSException; + public Object evaluate(AMQMessage message) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java index 4884067237..5f505fbeba 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -64,7 +64,7 @@ public class JMSSelectorFilter implements MessageFilter _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector); return match; } - catch (JMSException e) + catch (AMQException e) { //fixme this needs to be sorted.. it shouldn't happen e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java index 714d8c23f5..e6ad98cb8b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.filter; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; import javax.jms.JMSException; @@ -35,7 +36,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) { return new LogicExpression(lvalue, rvalue) { - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException { Boolean lv = (Boolean) left.evaluate(message); // Can we do an OR shortcut?? @@ -56,7 +57,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) { return new LogicExpression(lvalue, rvalue) { - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException { Boolean lv = (Boolean) left.evaluate(message); @@ -85,9 +86,9 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea super(left, right); } - abstract public Object evaluate(AMQMessage message) throws JMSException; + abstract public Object evaluate(AMQMessage message) throws AMQException; - public boolean matches(AMQMessage message) throws JMSException { + public boolean matches(AMQMessage message) throws AMQException { Object object = evaluate(message); return object!=null && object==Boolean.TRUE; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java index 283d324ff6..47ca930d12 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java @@ -20,15 +20,9 @@ */ package org.apache.qpid.server.filter; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.filter.jms.selector.SelectorParser; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInvalidSelectorException; import org.apache.log4j.Logger; - - -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; public class NoConsumerFilter implements MessageFilter { @@ -36,7 +30,7 @@ public class NoConsumerFilter implements MessageFilter public NoConsumerFilter() throws AMQException - { + { _logger.info("Created NoConsumerFilter"); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index f3e9965c2e..7d6a98df84 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -47,7 +47,7 @@ public class PropertyExpression implements Expression interface SubExpression { - public Object evaluate(AMQMessage message); + public Object evaluate(AMQMessage message) throws AMQException; } interface JMSExpression @@ -65,7 +65,7 @@ public class PropertyExpression implements Expression } - public Object evaluate(AMQMessage message) + public Object evaluate(AMQMessage message) throws AMQException { JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE); if (msg != null) @@ -226,7 +226,7 @@ public class PropertyExpression implements Expression jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name); } - public Object evaluate(AMQMessage message) throws JMSException + public Object evaluate(AMQMessage message) throws AMQException { // try // { diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java index 49ff147411..abc56f04d0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.filter; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; import java.math.BigDecimal; import java.util.Collection; @@ -43,7 +44,8 @@ public abstract class UnaryExpression implements Expression { public static Expression createNegate(Expression left) { return new UnaryExpression(left) { - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException + { Object rvalue = right.evaluate(message); if (rvalue == null) { return null; @@ -74,7 +76,7 @@ public abstract class UnaryExpression implements Expression { final Collection inList = t; return new BooleanUnaryExpression(right) { - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException { Object rvalue = right.evaluate(message); if (rvalue == null) { @@ -126,7 +128,7 @@ public abstract class UnaryExpression implements Expression { super(left); } - public boolean matches(AMQMessage message) throws JMSException { + public boolean matches(AMQMessage message) throws AMQException { Object object = evaluate(message); return object!=null && object==Boolean.TRUE; } @@ -135,7 +137,7 @@ public abstract class UnaryExpression implements Expression { public static BooleanExpression createNOT(BooleanExpression left) { return new BooleanUnaryExpression(left) { - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException { Boolean lvalue = (Boolean) right.evaluate(message); if (lvalue == null) { return null; @@ -159,7 +161,7 @@ public abstract class UnaryExpression implements Expression { public static BooleanExpression createBooleanCast(Expression left) { return new BooleanUnaryExpression(left) { - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException { Object rvalue = right.evaluate(message); if (rvalue == null) return null; diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java index ab952b6fea..85402e0781 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java @@ -31,6 +31,7 @@ import javax.jms.JMSException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.AMQException; /** * Used to evaluate an XPath Expression in a JMS selector. @@ -75,7 +76,7 @@ public final class XPathExpression implements BooleanExpression { private final XPathEvaluator evaluator; static public interface XPathEvaluator { - public boolean evaluate(AMQMessage message) throws JMSException; + public boolean evaluate(AMQMessage message) throws AMQException; } XPathExpression(String xpath) { @@ -97,7 +98,7 @@ public final class XPathExpression implements BooleanExpression { } } - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException { // try { //FIXME this is flow to disk work // if( message.isDropped() ) @@ -122,7 +123,8 @@ public final class XPathExpression implements BooleanExpression { * @return true if the expression evaluates to Boolean.TRUE. * @throws JMSException */ - public boolean matches(AMQMessage message) throws JMSException { + public boolean matches(AMQMessage message) throws AMQException + { Object object = evaluate(message); return object!=null && object==Boolean.TRUE; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java index 53764cbf75..da8a61650a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java @@ -18,6 +18,7 @@ package org.apache.qpid.server.filter; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.AMQException; import javax.jms.JMSException; // @@ -35,7 +36,7 @@ public final class XQueryExpression implements BooleanExpression { this.xpath = xpath; } - public Object evaluate(AMQMessage message) throws JMSException { + public Object evaluate(AMQMessage message) throws AMQException { return Boolean.FALSE; } @@ -48,7 +49,8 @@ public final class XQueryExpression implements BooleanExpression { * @return true if the expression evaluates to Boolean.TRUE. * @throws JMSException */ - public boolean matches(AMQMessage message) throws JMSException { + public boolean matches(AMQMessage message) throws AMQException + { Object object = evaluate(message); return object!=null && object==Boolean.TRUE; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java index 4b78fd18df..f74e0cedec 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java @@ -34,6 +34,7 @@ import javax.xml.parsers.DocumentBuilderFactory; //import org.apache.activemq.util.ByteArrayInputStream; import org.apache.xpath.CachedXPathAPI; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.AMQException; import org.w3c.dom.Document; import org.w3c.dom.traversal.NodeIterator; import org.xml.sax.InputSource; @@ -46,17 +47,26 @@ public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator { this.xpath = xpath; } - public boolean evaluate(AMQMessage m) throws JMSException { - if( m instanceof TextMessage ) { - String text = ((TextMessage)m).getText(); - return evaluate(text); - } else if ( m instanceof BytesMessage ) { - BytesMessage bm = (BytesMessage) m; - byte data[] = new byte[(int) bm.getBodyLength()]; - bm.readBytes(data); - return evaluate(data); - } - return false; + public boolean evaluate(AMQMessage m) throws AMQException + { + try + { + + if( m instanceof TextMessage ) { + String text = ((TextMessage)m).getText(); + return evaluate(text); + } else if ( m instanceof BytesMessage ) { + BytesMessage bm = (BytesMessage) m; + byte data[] = new byte[(int) bm.getBodyLength()]; + bm.readBytes(data); + return evaluate(data); + } + return false; + } + catch (JMSException e) + { + throw new AMQException("Error evaluting message: " + e, e); + } } private boolean evaluate(byte[] data) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index 7fcad5bbf3..c67b86af09 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -49,15 +49,18 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> AMQMethodEvent<TxCommitBody> evt) throws AMQException { - try{ + try + { AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); channel.commit(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); - channel.processReturns(protocolSession); - }catch(AMQException e){ + channel.processReturns(protocolSession); + } + catch(AMQException e) + { throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index 7df3825d8a..f4738ff01b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -47,7 +47,7 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<TxSelectBody> evt) throws AMQException { - protocolSession.getChannel(evt.getChannelId()).setTransactional(true); + protocolSession.getChannel(evt.getChannelId()).setLocalTransactional(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java index 72e241ea0a..376f88cbf1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java @@ -24,6 +24,7 @@ import org.apache.qpid.server.message.MessageDecorator; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.AMQException; import javax.jms.Message; import javax.jms.JMSException; @@ -37,7 +38,7 @@ public class JMSMessage implements MessageDecorator private AMQMessage _message; private BasicContentHeaderProperties _properties; - public JMSMessage(AMQMessage message) + public JMSMessage(AMQMessage message) throws AMQException { _message = message; ContentHeaderBody contentHeader = message.getContentHeaderBody(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 831117d2c6..08668f0f6a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -40,7 +40,6 @@ import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -201,17 +200,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } else { - try - { - contentFrameReceived(frame); - } - catch (RequiredDeliveryException e) - { - //need to return the message: - _logger.info("Returning message to " + this + " channel " + frame.channel - + ": " + e.getMessage()); - writeFrame(e.getReturnMessage(frame.channel)); - } + contentFrameReceived(frame); } } } @@ -287,7 +276,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } - getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame); + getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame, this); } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index afe4ea95b9..f30667690f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -21,19 +21,17 @@ package org.apache.qpid.server.queue; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.txn.TxnBuffer; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.message.MessageDecorator; import org.apache.qpid.server.message.jms.JMSMessage; -import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; -import java.util.ArrayList; -import java.util.List; -import java.util.LinkedList; -import java.util.Set; -import java.util.HashSet; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ConcurrentHashMap; @@ -43,45 +41,28 @@ import java.util.concurrent.ConcurrentHashMap; */ public class AMQMessage { + private static final Logger _log = Logger.getLogger(AMQMessage.class); + public static final String JMS_MESSAGE = "jms.message"; + /** + * Used in clustering + */ private final Set<Object> _tokens = new HashSet<Object>(); + /** + * Only use in clustering - should ideally be removed? + */ private AMQProtocolSession _publisher; - private final BasicPublishBody _publishBody; - - private ContentHeaderBody _contentHeaderBody; - - private List<ContentBody> _contentBodies; - - private boolean _redelivered; - private final long _messageId; private final AtomicInteger _referenceCount = new AtomicInteger(1); - /** - * Keeps a track of how many bytes we have received in body frames - */ - private long _bodyLengthReceived = 0; + private AMQMessageHandle _messageHandle; - /** - * The message store in which this message is contained. - */ - private transient final MessageStore _store; - - /** - * For non transactional publishes, a message can be stored as - * soon as it is complete. For transactional messages it doesnt - * need to be stored until the transaction is committed. - */ - private boolean _storeWhenComplete; - - /** - * TxnBuffer for transactionally published messages - */ - private TxnBuffer _txnBuffer; + // TODO: ideally this should be able to go into the transient message date - check this! (RG) + private TransactionalContext _txnContext; /** * Flag to indicate whether message has been delivered to a @@ -89,178 +70,245 @@ public class AMQMessage * messages published with the 'immediate' flag. */ private boolean _deliveredToConsumer; + private ConcurrentHashMap<String, MessageDecorator> _decodedMessages; - private AtomicBoolean _taken; + private AtomicBoolean _taken = new AtomicBoolean(false); + private TransientMessageData _transientMessageData = new TransientMessageData(); - public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody) + /** + * Used to iterate through all the body frames associated with this message. Will not + * keep all the data in memory therefore is memory-efficient. + */ + private class BodyFrameIterator implements Iterator<AMQDataBlock> { - this(messageStore, publishBody, true); - } + private int _channel; - public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody, boolean storeWhenComplete) - { - _messageId = messageStore.getNewMessageId(); - _publishBody = publishBody; - _store = messageStore; - _contentBodies = new LinkedList<ContentBody>(); - _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>(); - _storeWhenComplete = storeWhenComplete; - _taken = new AtomicBoolean(false); - } + private int _index = -1; - public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) - throws AMQException + private BodyFrameIterator(int channel) + { + _channel = channel; + } - { - _publishBody = publishBody; - _contentHeaderBody = contentHeaderBody; - _contentBodies = contentBodies; - _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>(); - _messageId = messageId; - _store = store; - storeMessage(); - } + public boolean hasNext() + { + try + { + return _index < _messageHandle.getBodyCount(_messageId) - 1; + } + catch (AMQException e) + { + _log.error("Unable to get body count: " + e, e); + return false; + } + } - public AMQMessage(MessageStore store, BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) - throws AMQException - { - this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies); - } + public AMQDataBlock next() + { + try + { + ContentBody cb = _messageHandle.getContentBody(_messageId, ++_index); + return ContentBody.createAMQFrame(_channel, cb); + } + catch (AMQException e) + { + // have no choice but to throw a runtime exception + throw new RuntimeException("Error getting content body: " + e, e); + } - protected AMQMessage(AMQMessage msg) throws AMQException - { - this(msg._store, msg._messageId, msg._publishBody, msg._contentHeaderBody, msg._contentBodies); - } + } - public void storeMessage() throws AMQException - { - if (isPersistent()) + public void remove() { - _store.put(this); + throw new UnsupportedOperationException(); } } - public CompositeAMQDataBlock getDataBlock(ByteBuffer encodedDeliverBody, int channel) + private class BodyContentIterator implements Iterator<ContentBody> { - AMQFrame[] allFrames = new AMQFrame[1 + _contentBodies.size()]; - allFrames[0] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); - for (int i = 1; i < allFrames.length; i++) + private int _index = -1; + + public boolean hasNext() { - allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 1)); + try + { + return _index < _messageHandle.getBodyCount(_messageId) - 1; + } + catch (AMQException e) + { + _log.error("Error getting body count: " + e, e); + return false; + } } - return new CompositeAMQDataBlock(encodedDeliverBody, allFrames); - } - public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag) - { - - AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()]; + public ContentBody next() + { + try + { + return _messageHandle.getContentBody(_messageId, ++_index); + } + catch (AMQException e) + { + throw new RuntimeException("Error getting content body: " + e, e); + } + } - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - allFrames[0] = BasicDeliverBody.createAMQFrame(channel, - (byte)8, (byte)0, // AMQP version (major, minor) - consumerTag, // consumerTag - deliveryTag, // deliveryTag - getExchangeName(), // exchange - _redelivered, // redelivered - getRoutingKey() // routingKey - ); - allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); - for (int i = 2; i < allFrames.length; i++) + public void remove() { - allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2)); + throw new UnsupportedOperationException(); } - return new CompositeAMQDataBlock(allFrames); } - public List<AMQBody> getPayload() + public AMQMessage(long messageId, BasicPublishBody publishBody, + TransactionalContext txnContext) { - List<AMQBody> payload = new ArrayList<AMQBody>(2 + _contentBodies.size()); - payload.add(_publishBody); - payload.add(_contentHeaderBody); - payload.addAll(_contentBodies); - return payload; + _messageId = messageId; + _txnContext = txnContext; + _transientMessageData.setPublishBody(publishBody); + _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>(); + _taken = new AtomicBoolean(false); + if (_log.isDebugEnabled()) + { + _log.debug("Message created with id " + messageId); + } } - public BasicPublishBody getPublishBody() + /** + * Used when recovering, i.e. when the message store is creating references to messages. + * In that case, the normal enqueue/routingComplete is not done since the recovery process + * is responsible for routing the messages to queues. + * @param messageId + * @param store + * @param factory + * @throws AMQException + */ + public AMQMessage(long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException { - return _publishBody; + _messageId = messageId; + _messageHandle = factory.createMessageHandle(messageId, store, true); + _transientMessageData = null; } - public ContentHeaderBody getContentHeaderBody() + /** + * Used in testing only. This allows the passing of the content header immediately + * on construction. + * @param messageId + * @param publishBody + * @param txnContext + * @param contentHeader + */ + public AMQMessage(long messageId, BasicPublishBody publishBody, + TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException { - return _contentHeaderBody; + this(messageId, publishBody, txnContext); + setContentHeaderBody(contentHeader); } - public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException - { - _contentHeaderBody = contentHeaderBody; - if (_storeWhenComplete && isAllContentReceived()) + /** + * Used in testing only. This allows the passing of the content header and some body fragments on + * construction. + * @param messageId + * @param publishBody + * @param txnContext + * @param contentHeader + * @param destinationQueues + * @param contentBodies + * @throws AMQException + */ + public AMQMessage(long messageId, BasicPublishBody publishBody, + TransactionalContext txnContext, + ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, + List<ContentBody> contentBodies, MessageStore messageStore, StoreContext storeContext, + MessageHandleFactory messageHandleFactory) throws AMQException + { + this(messageId, publishBody, txnContext, contentHeader); + _transientMessageData.setDestinationQueues(destinationQueues); + routingComplete(messageStore, storeContext, messageHandleFactory); + for (ContentBody cb : contentBodies) { - storeMessage(); + addContentBodyFrame(storeContext, cb); } } - public List<ContentBody> getContentBodies() + protected AMQMessage(AMQMessage msg) throws AMQException { - return _contentBodies; + _messageId = msg._messageId; + _messageHandle = msg._messageHandle; + _txnContext = msg._txnContext; + _deliveredToConsumer = msg._deliveredToConsumer; + _transientMessageData = msg._transientMessageData; } - public void setContentBodies(List<ContentBody> contentBodies) + public Iterator<AMQDataBlock> getBodyFrameIterator(int channel) { - _contentBodies = contentBodies; + return new BodyFrameIterator(channel); } - public void addContentBodyFrame(ContentBody contentBody) throws AMQException + public Iterator<ContentBody> getContentBodyIterator() { - _contentBodies.add(contentBody); - _bodyLengthReceived += contentBody.getSize(); - if (_storeWhenComplete && isAllContentReceived()) - { - storeMessage(); - } + return new BodyContentIterator(); } - public boolean isAllContentReceived() + public ContentHeaderBody getContentHeaderBody() throws AMQException { - return _bodyLengthReceived == _contentHeaderBody.bodySize; + if (_transientMessageData != null) + { + return _transientMessageData.getContentHeaderBody(); + } + else + { + return _messageHandle.getContentHeaderBody(_messageId); + } } - - public boolean isRedelivered() + public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) + throws AMQException { - return _redelivered; + _transientMessageData.setContentHeaderBody(contentHeaderBody); } - String getExchangeName() + public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) throws AMQException { - return _publishBody.exchange; - } + final boolean persistent = isPersistent(); + _messageHandle = factory.createMessageHandle(_messageId, store, persistent); + if (persistent) + { + _txnContext.beginTranIfNecessary(); + } - String getRoutingKey() - { - return _publishBody.routingKey; - } + // enqueuing the messages ensure that if required the destinations are recorded to a + // persistent store + for (AMQQueue q : _transientMessageData.getDestinationQueues()) + { + _messageHandle.enqueue(storeContext, _messageId, q); + } - boolean isImmediate() - { - return _publishBody.immediate; + if (_transientMessageData.getContentHeaderBody().bodySize == 0) + { + deliver(storeContext); + } } - NoConsumersException getNoConsumersException(String queue) + public boolean addContentBodyFrame(StoreContext storeContext, ContentBody contentBody) throws AMQException { - return new NoConsumersException(queue, _publishBody, _contentHeaderBody, _contentBodies); + _transientMessageData.addBodyLength(contentBody.getSize()); + _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody); + if (isAllContentReceived()) + { + deliver(storeContext); + return true; + } + else + { + return false; + } } - public void setRedelivered(boolean redelivered) + public boolean isAllContentReceived() throws AMQException { - _redelivered = redelivered; + return _transientMessageData.isAllContentReceived(); } public long getMessageId() @@ -274,13 +322,20 @@ public class AMQMessage public void incrementReference() { _referenceCount.incrementAndGet(); + if (_log.isDebugEnabled()) + { + _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount); + } } /** * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the * message store. + * + * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that + * failed */ - public void decrementReference() throws MessageCleanupException + public void decrementReference(StoreContext storeContext) throws MessageCleanupException { // note that the operation of decrementing the reference count and then removing the message does not // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after @@ -290,7 +345,16 @@ public class AMQMessage { try { - _store.removeMessage(_messageId); + if (_log.isDebugEnabled()) + { + _log.debug("Ref count on message " + _messageId + " is zero; removing message"); + } + // must check if the handle is null since there may be cases where we decide to throw away a message + // and the handle has not yet been constructed + if (_messageHandle != null) + { + _messageHandle.removeMessage(storeContext, _messageId); + } } catch (AMQException e) { @@ -299,6 +363,17 @@ public class AMQMessage throw new MessageCleanupException(_messageId, e); } } + else + { + if (_log.isDebugEnabled()) + { + _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId); + if (_referenceCount.get() < 0) + { + Thread.dumpStack(); + } + } + } } public void setPublisher(AMQProtocolSession publisher) @@ -311,6 +386,60 @@ public class AMQMessage return _publisher; } + /** + * Called selectors to determin if the message has already been sent + * @return _deliveredToConsumer + */ + public boolean getDeliveredToConsumer() + { + return _deliveredToConsumer; + } + + + public MessageDecorator getDecodedMessage(String type) throws AMQException + { + MessageDecorator msgtype = null; + + if (_decodedMessages != null) + { + msgtype = _decodedMessages.get(type); + + if (msgtype == null) + { + msgtype = decorateMessage(type); + } + } + + return msgtype; + } + + private MessageDecorator decorateMessage(String type) throws AMQException + { + MessageDecorator msgdec = null; + + if (type.equals(JMS_MESSAGE)) + { + msgdec = new JMSMessage(this); + } + + if (msgdec != null) + { + _decodedMessages.put(type, msgdec); + } + + return msgdec; + } + + public boolean taken() + { + return _taken.getAndSet(true); + } + + public void release() + { + _taken.set(false); + } + public boolean checkToken(Object token) { if (_tokens.contains(token)) @@ -324,124 +453,215 @@ public class AMQMessage } } + /** + * Registers a queue to which this message is to be delivered. This is + * called from the exchange when it is routing the message. This will be called before any content bodies have + * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria. + * + * @param queue the queue + * @throws org.apache.qpid.AMQException if there is an error enqueuing the message + */ public void enqueue(AMQQueue queue) throws AMQException { - //if the message is not persistent or the queue is not durable - //we will not need to recover the association and so do not - //need to record it - if (isPersistent() && queue.isDurable()) - { - _store.enqueueMessage(queue.getName(), _messageId); - } + _transientMessageData.addDestinationQueue(queue); } - public void dequeue(AMQQueue queue) throws AMQException + public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException { - //only record associations where both queue and message will survive - //a restart, so only need to remove association if this is the case - if (isPersistent() && queue.isDurable()) - { - _store.dequeueMessage(queue.getName(), _messageId); - } + _messageHandle.dequeue(storeContext, _messageId, queue); } public boolean isPersistent() throws AMQException { - if (_contentHeaderBody == null) + if (_transientMessageData != null) { - throw new AMQException("Cannot determine delivery mode of message. Content header not found."); + return _transientMessageData.isPersistent(); } + else + { + return _messageHandle.isPersistent(_messageId); + } + } - //todo remove literal values to a constant file such as AMQConstants in common - return _contentHeaderBody.properties instanceof BasicContentHeaderProperties - && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; + /** + * Called to enforce the 'immediate' flag. + * + * @throws NoConsumersException if the message is marked for + * immediate delivery but has not been marked as delivered to a + * consumer + */ + public void checkDeliveredToConsumer() throws NoConsumersException, AMQException + { + BasicPublishBody pb = getPublishBody(); + if (pb.immediate && !_deliveredToConsumer) + { + throw new NoConsumersException(this); + } } - public void setTxnBuffer(TxnBuffer buffer) + public BasicPublishBody getPublishBody() throws AMQException { - _txnBuffer = buffer; + BasicPublishBody pb; + if (_transientMessageData != null) + { + pb = _transientMessageData.getPublishBody(); + } + else + { + pb = _messageHandle.getPublishBody(_messageId); + } + return pb; } - public TxnBuffer getTxnBuffer() + public boolean isRedelivered() { - return _txnBuffer; + return _messageHandle.isRedelivered(); } - /** - * Called to enforce the 'immediate' flag. - * @throws NoConsumersException if the message is marked for - * immediate delivery but has not been marked as delivered to a - * consumer - */ - public void checkDeliveredToConsumer() throws NoConsumersException + public void setRedelivered(boolean redelivered) { - if (isImmediate() && !_deliveredToConsumer) - { - throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies); - } + _messageHandle.setRedelivered(redelivered); } /** * Called when this message is delivered to a consumer. (used to * implement the 'immediate' flag functionality). - * And by selectors to determin if the message has already been sent */ public void setDeliveredToConsumer() { _deliveredToConsumer = true; } - /** - * Called selectors to determin if the message has already been sent - * @return _deliveredToConsumer - */ - public boolean getDeliveredToConsumer() + private void deliver(StoreContext storeContext) throws AMQException { - return _deliveredToConsumer; - } - + // we get a reference to the destination queues now so that we can clear the + // transient message data as quickly as possible + List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues(); + try + { + // first we allow the handle to know that the message has been fully received. This is useful if it is + // maintaining any calculated values based on content chunks + _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getPublishBody(), + _transientMessageData.getContentHeaderBody()); - public MessageDecorator getDecodedMessage(String type) - { - MessageDecorator msgtype = null; + // we then allow the transactional context to do something with the message content + // now that it has all been received, before we attempt delivery + _txnContext.messageFullyReceived(isPersistent()); - if (_decodedMessages != null) - { - msgtype = _decodedMessages.get(type); + _transientMessageData = null; - if (msgtype == null) + for (AMQQueue q : destinationQueues) { - msgtype = decorateMessage(type); + _txnContext.deliver(this, q); } } - - return msgtype; + finally + { + destinationQueues.clear(); + decrementReference(storeContext); + } } - private MessageDecorator decorateMessage(String type) + public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, String consumerTag) + throws AMQException { - MessageDecorator msgdec = null; + ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, consumerTag); + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); - if (type.equals(JMS_MESSAGE)) + Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId); + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + if (bodyFrameIterator.hasNext()) { - msgdec = new JMSMessage(this); + AMQDataBlock firstContentBody = bodyFrameIterator.next(); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + } + else + { + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, + new AMQDataBlock[]{contentHeader}); + protocolSession.writeFrame(compositeBlock); } - if (msgdec != null) + // + // Now start writing out the other content bodies + // + while (bodyFrameIterator.hasNext()) { - _decodedMessages.put(type, msgdec); + protocolSession.writeFrame(bodyFrameIterator.next()); } - return msgdec; } - public boolean taken() + private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, String consumerTag) + throws AMQException { - return _taken.getAndSet(true); + BasicPublishBody pb = getPublishBody(); + AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, (byte) 8, (byte) 0, consumerTag, + deliveryTag, pb.exchange, _messageHandle.isRedelivered(), + pb.routingKey); + ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? + deliverFrame.writePayload(buf); + buf.flip(); + return buf; } - public void release() + private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText) throws AMQException { - _taken.set(false); + AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange, + replyCode, replyText, + getPublishBody().routingKey); + ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? + returnFrame.writePayload(buf); + buf.flip(); + return buf; + } + + public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, String replyText) + throws AMQException + { + ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText); + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); + + Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId); + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + if (bodyFrameIterator.hasNext()) + { + AMQDataBlock firstContentBody = bodyFrameIterator.next(); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + } + else + { + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, + new AMQDataBlock[]{contentHeader}); + protocolSession.writeFrame(compositeBlock); + } + + // + // Now start writing out the other content bodies + // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded + // + while (bodyFrameIterator.hasNext()) + { + protocolSession.writeFrame(bodyFrameIterator.next()); + } + } + + public String toString() + { + return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + + _taken; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java new file mode 100644 index 0000000000..ef5d460b9b --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * A pluggable way of getting message data. Implementations can provide intelligent caching for example or + * even no caching at all to minimise the broker memory footprint. + * + * The method all take a messageId to avoid having to store it in the instance - the AMQMessage container + * must already keen the messageId so it is pointless storing it twice. + */ +public interface AMQMessageHandle +{ + ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException; + + /** + * @return the number of body frames associated with this message + */ + int getBodyCount(long messageId) throws AMQException; + + /** + * @return the size of the body + */ + long getBodySize(long messageId) throws AMQException; + + /** + * Get a particular content body + * @param index the index of the body to retrieve, must be between 0 and getBodyCount() - 1 + * @return a content body + * @throws IllegalArgumentException if the index is invalid + */ + ContentBody getContentBody(long messageId, int index) throws IllegalArgumentException, AMQException; + + void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException; + + BasicPublishBody getPublishBody(long messageId) throws AMQException; + + boolean isRedelivered(); + + void setRedelivered(boolean redelivered); + + boolean isPersistent(long messageId) throws AMQException; + + void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody, + ContentHeaderBody contentHeaderBody) + throws AMQException; + + void removeMessage(StoreContext storeContext, long messageId) throws AMQException; + + void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException; + + void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException; +}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 101a2833a0..6f1018e753 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,8 +27,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.txn.TxnBuffer; -import org.apache.qpid.server.txn.TxnOp; +import org.apache.qpid.server.store.StoreContext; import javax.management.JMException; import java.text.MessageFormat; @@ -344,17 +343,17 @@ public class AMQQueue implements Managable, Comparable /** * Removes the AMQMessage from the top of the queue. */ - public void deleteMessageFromTop() throws AMQException + public void deleteMessageFromTop(StoreContext storeContext) throws AMQException { - _deliveryMgr.removeAMessageFromTop(); + _deliveryMgr.removeAMessageFromTop(storeContext); } /** * removes all the messages from the queue. */ - public void clearQueue() throws AMQException + public void clearQueue(StoreContext storeContext) throws AMQException { - _deliveryMgr.clearAllMessages(); + _deliveryMgr.clearAllMessages(storeContext); } public void bind(String routingKey, Exchange exchange) @@ -378,7 +377,7 @@ public class AMQQueue implements Managable, Comparable { if (_deliveryMgr.hasQueuedMessages()) { - _deliveryMgr.populatePreDeliveryQueue(subscription); + _deliveryMgr.populatePreDeliveryQueue(subscription); } } @@ -443,30 +442,9 @@ public class AMQQueue implements Managable, Comparable delete(); } - public void deliver(AMQMessage msg) throws AMQException + public void process(StoreContext storeContext, AMQMessage msg) throws AMQException { - TxnBuffer buffer = msg.getTxnBuffer(); - if (buffer == null) - { - //non-transactional - record(msg); - process(msg); - } - else - { - buffer.enlist(new Deliver(msg)); - } - } - - private void record(AMQMessage msg) throws AMQException - { - msg.enqueue(this); - msg.incrementReference(); - } - - private void process(AMQMessage msg) throws FailedDequeueException - { - _deliveryMgr.deliver(getName(), msg); + _deliveryMgr.deliver(storeContext, getName(), msg); try { msg.checkDeliveredToConsumer(); @@ -476,16 +454,16 @@ public class AMQQueue implements Managable, Comparable { // as this message will be returned, it should be removed // from the queue: - dequeue(msg); + dequeue(storeContext, msg); } } - void dequeue(AMQMessage msg) throws FailedDequeueException + void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException { try { - msg.dequeue(this); - msg.decrementReference(); + msg.dequeue(storeContext, this); + msg.decrementReference(storeContext); } catch (MessageCleanupException e) { @@ -511,10 +489,17 @@ public class AMQQueue implements Managable, Comparable return _subscribers; } - protected void updateReceivedMessageCount(AMQMessage msg) + protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException { _totalMessagesReceived++; - _managedObject.checkForNotification(msg); + try + { + _managedObject.checkForNotification(msg); + } + catch (JMException e) + { + throw new AMQException("Unable to get notification from manage queue: " + e, e); + } } public boolean equals(Object o) @@ -550,45 +535,4 @@ public class AMQQueue implements Managable, Comparable _logger.debug(MessageFormat.format(msg, args)); } } - - private class Deliver implements TxnOp - { - private final AMQMessage _msg; - - Deliver(AMQMessage msg) - { - _msg = msg; - } - - public void prepare() throws AMQException - { - //do the persistent part of the record() - _msg.enqueue(AMQQueue.this); - } - - public void undoPrepare() - { - } - - public void commit() - { - //do the memeory part of the record() - _msg.incrementReference(); - //then process the message - try - { - process(_msg); - } - catch (FailedDequeueException e) - { - //TODO: is there anything else we can do here? I think not... - _logger.error("Error during commit of a queue delivery: " + e, e); - } - } - - public void rollback() - { - } - } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 1bdf265a1b..77bbdf7b4b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -20,6 +20,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -35,6 +36,7 @@ import javax.management.OperationsException; import javax.management.monitor.MonitorNotification; import java.util.List; import java.util.ArrayList; +import java.util.Iterator; /** * MBean class for AMQQueue. It implements all the management features exposed @@ -43,6 +45,12 @@ import java.util.ArrayList; @MBeanDescription("Management Interface for AMQQueue") public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue { + /** + * Since the MBean is not associated with a real channel we can safely create our own store context + * for use in the few methods that require one. + */ + private StoreContext _storeContext = new StoreContext(); + private AMQQueue _queue = null; private String _queueName = null; // OpenMBean data types for viewMessages method @@ -165,7 +173,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue /** * returns the size of messages(KB) in the queue. */ - public Long getQueueDepth() + public Long getQueueDepth() throws JMException { List<AMQMessage> list = _queue.getMessagesOnTheQueue(); if (list.size() == 0) @@ -174,9 +182,16 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue } long queueDepth = 0; - for (AMQMessage message : list) + try { - queueDepth = queueDepth + getMessageSize(message); + for (AMQMessage message : list) + { + queueDepth = queueDepth + getMessageSize(message); + } + } + catch (AMQException e) + { + throw new JMException("Unable to get message size: " + e); } return (long) Math.round(queueDepth / 1000); } @@ -184,7 +199,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue /** * returns size of message in bytes */ - private long getMessageSize(AMQMessage msg) + private long getMessageSize(AMQMessage msg) throws AMQException { if (msg == null) { @@ -197,7 +212,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue /** * Checks if there is any notification to be send to the listeners */ - public void checkForNotification(AMQMessage msg) + public void checkForNotification(AMQMessage msg) throws AMQException, JMException { // Check for threshold message count Integer msgCount = getMessageCount(); @@ -233,13 +248,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue } /** - * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop() + * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop */ public void deleteMessageFromTop() throws JMException { try { - _queue.deleteMessageFromTop(); + _queue.deleteMessageFromTop(_storeContext); } catch (AMQException ex) { @@ -248,13 +263,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue } /** - * @see org.apache.qpid.server.queue.AMQQueue#clearQueue() + * @see org.apache.qpid.server.queue.AMQQueue#clearQueue */ public void clearQueue() throws JMException { try { - _queue.clearQueue(); + _queue.clearQueue(_storeContext); } catch (AMQException ex) { @@ -273,11 +288,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); } // get message content - List<ContentBody> cBodies = msg.getContentBodies(); + Iterator<ContentBody> cBodies = msg.getContentBodyIterator(); List<Byte> msgContent = new ArrayList<Byte>(); - if (cBodies != null) + while (cBodies.hasNext()) { - for (ContentBody body : cBodies) + ContentBody body = cBodies.next(); + if (body.getSize() != 0) { if (body.getSize() != 0) { @@ -290,17 +306,23 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue } } - // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties; - String mimeType = null, encoding = null; - if (headerProperties != null) + try { - mimeType = headerProperties.getContentType(); - encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); + // Create header attributes list + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties; + String mimeType = null, encoding = null; + if (headerProperties != null) + { + mimeType = headerProperties.getContentType(); + encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); + } + Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; + return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); + } + catch (AMQException e) + { + throw new JMException("Error creating header attributes list: " + e); } - Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; - - return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); } /** @@ -317,17 +339,24 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue List<AMQMessage> list = _queue.getMessagesOnTheQueue(); TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); - // Create the tabular list of message header contents - for (int i = beginIndex; i <= endIndex && i <= list.size(); i++) + try { - AMQMessage msg = list.get(i - 1); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); - // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; - String[] headerAttributes = headerProperties.toString().split(","); - Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()}; - CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); - _messageList.put(messageData); + // Create the tabular list of message header contents + for (int i = beginIndex; i <= endIndex && i <= list.size(); i++) + { + AMQMessage msg = list.get(i - 1); + ContentHeaderBody headerBody = msg.getContentHeaderBody(); + // Create header attributes list + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; + String[] headerAttributes = headerProperties.toString().split(","); + Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()}; + CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); + _messageList.put(messageData); + } + } + catch (AMQException e) + { + throw new JMException("Error creating message contents: " + e); } return _messageList; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java index 022d3b9635..a2898ccdce 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,18 +22,17 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.configuration.Configured; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.Executor; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; /** @@ -46,16 +45,19 @@ public class ConcurrentDeliveryManager implements DeliveryManager @Configured(path = "advanced.compressBufferOnQueue", defaultValue = "false") public boolean compressBufferOnQueue; + /** * Holds any queued messages */ private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + //private int _messageCount; /** * Ensures that only one asynchronous task is running for this manager at * any time. */ private final AtomicBoolean _processing = new AtomicBoolean(); + /** * The subscriptions on the queue to whom messages are delivered */ @@ -67,7 +69,6 @@ public class ConcurrentDeliveryManager implements DeliveryManager */ private final AMQQueue _queue; - /** * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered @@ -77,7 +78,6 @@ public class ConcurrentDeliveryManager implements DeliveryManager */ private ReentrantLock _lock = new ReentrantLock(); - ConcurrentDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -101,14 +101,13 @@ public class ConcurrentDeliveryManager implements DeliveryManager return hasQueuedMessages(); } - /** * @param msg to enqueue * @return true if we are queue this message */ - private boolean enqueue(AMQMessage msg) + private boolean enqueue(AMQMessage msg) throws AMQException { - if (msg.isImmediate()) + if (msg.getPublishBody().immediate) { return false; } @@ -133,9 +132,9 @@ public class ConcurrentDeliveryManager implements DeliveryManager } } - private void startQueueing(AMQMessage msg) + private void startQueueing(AMQMessage msg) throws AMQException { - if (!msg.isImmediate()) + if (!msg.getPublishBody().immediate) { addMessageToQueue(msg); } @@ -143,7 +142,10 @@ public class ConcurrentDeliveryManager implements DeliveryManager private boolean addMessageToQueue(AMQMessage msg) { - // Shrink the ContentBodies to their actual size to save memory. + // Shrink the ContentBodies to their actual size to save memory. + /* TODO need to reimplement this - probably not in this class though + * for obvious reasons + if (compressBufferOnQueue) { Iterator it = msg.getContentBodies().iterator(); @@ -153,16 +155,14 @@ public class ConcurrentDeliveryManager implements DeliveryManager cb.reduceBufferToFit(); } } - + */ _messages.offer(msg); return true; } - public boolean hasQueuedMessages() { - _lock.lock(); try { @@ -172,8 +172,6 @@ public class ConcurrentDeliveryManager implements DeliveryManager { _lock.unlock(); } - - } public int getQueueMessageCount() @@ -203,21 +201,21 @@ public class ConcurrentDeliveryManager implements DeliveryManager //no-op . This DM has no PreDeliveryQueues } - public synchronized void removeAMessageFromTop() throws AMQException + public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); if (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); } } - public synchronized void clearAllMessages() throws AMQException + public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); while (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); msg = poll(); } } @@ -226,7 +224,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager * Only one thread should ever execute this method concurrently, but * it can do so while other threads invoke deliver(). */ - private void processQueue() + private void processQueue() throws AMQException { try { @@ -296,7 +294,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager } } - public void deliver(String name, AMQMessage msg) throws FailedDequeueException + public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException { // first check whether we are queueing, and enqueue if we are if (!enqueue(msg)) @@ -308,7 +306,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager Subscription s = _subscriptions.nextSubscriber(msg); if (s == null) { - if (!msg.isImmediate()) + if (!msg.getPublishBody().immediate) { // no subscribers yet so enter 'queueing' mode and queue this message startQueueing(msg); @@ -333,7 +331,18 @@ public class ConcurrentDeliveryManager implements DeliveryManager boolean running = true; while (running) { - processQueue(); + try + { + processQueue(); + } + catch (AMQException e) + { + _log.error("Error processing queue: " + e, e); + _log.error("Delivery manager terminating."); + running = false; + _processing.set(false); + break; + } //Check that messages have not been added since we did our last peek(); // Synchronize with the thread that adds to the queue. diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index f09e8213b1..8f0c3a5ec7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -26,6 +26,7 @@ import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.store.StoreContext; import java.util.ArrayList; import java.util.Iterator; @@ -99,10 +100,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Shrink the ContentBodies to their actual size to save memory. if (compressBufferOnQueue) { - Iterator it = msg.getContentBodies().iterator(); + Iterator<ContentBody> it = msg.getContentBodyIterator(); while (it.hasNext()) { - ContentBody cb = (ContentBody) it.next(); + ContentBody cb = it.next(); cb.reduceBufferToFit(); } } @@ -167,21 +168,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - public synchronized void removeAMessageFromTop() throws AMQException + public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); if (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); } } - public synchronized void clearAllMessages() throws AMQException + public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); while (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); msg = poll(); } } @@ -223,7 +224,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //remove sent message from our queue. messageQueue.poll(); } - catch (FailedDequeueException e) + catch (AMQException e) { message.release(); _log.error("Unable to deliver message as dequeue failed: " + e, e); @@ -279,11 +280,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return _messages.poll(); } - public void deliver(String name, AMQMessage msg) throws FailedDequeueException + public void deliver(StoreContext context, String name, AMQMessage msg) throws AMQException { if (_log.isDebugEnabled()) { - _log.debug(id() + "deliver :" + System.identityHashCode(msg)); + _log.debug(id() + "deliver :" + msg); } //Check if we have someone to deliver the message to. @@ -296,9 +297,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery"); + _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery"); } - if (!msg.isImmediate()) + if (!msg.getPublishBody().immediate) { addMessageToQueue(msg); @@ -308,7 +309,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //Pre Deliver to all subscriptions if (_log.isDebugEnabled()) { - _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + + _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to."); } for (Subscription sub : _subscriptions.getSubscriptions()) @@ -330,7 +331,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + + _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } sub.enqueueForPreDelivery(msg); @@ -345,7 +346,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s); } //Deliver the message diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index cac499587f..82d8f9538f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; import java.util.concurrent.Executor; import java.util.List; @@ -66,11 +67,11 @@ interface DeliveryManager * @param msg the message to deliver * @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued */ - void deliver(String name, AMQMessage msg) throws FailedDequeueException; + void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException; - void removeAMessageFromTop() throws AMQException; + void removeAMessageFromTop(StoreContext storeContext) throws AMQException; - void clearAllMessages() throws AMQException; + void clearAllMessages(StoreContext storeContext) throws AMQException; List<AMQMessage> getMessages(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java new file mode 100644 index 0000000000..e2758ce6cb --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -0,0 +1,133 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; + +import java.util.LinkedList; +import java.util.List; + +/** + */ +public class InMemoryMessageHandle implements AMQMessageHandle +{ + + private ContentHeaderBody _contentHeaderBody; + + private BasicPublishBody _publishBody; + + private List<ContentBody> _contentBodies = new LinkedList<ContentBody>(); + + private boolean _redelivered; + + public InMemoryMessageHandle() + { + } + + public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException + { + return _contentHeaderBody; + } + + public int getBodyCount(long messageId) + { + return _contentBodies.size(); + } + + public long getBodySize(long messageId) throws AMQException + { + return getContentHeaderBody(messageId).bodySize; + } + + public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException + { + if (index > _contentBodies.size() - 1) + { + throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " + + (_contentBodies.size() - 1)); + } + return _contentBodies.get(index); + } + + public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) + throws AMQException + { + _contentBodies.add(contentBody); + } + + public BasicPublishBody getPublishBody(long messageId) throws AMQException + { + return _publishBody; + } + + public boolean isRedelivered() + { + return _redelivered; + } + + + public void setRedelivered(boolean redelivered) + { + _redelivered = redelivered; + } + + public boolean isPersistent(long messageId) throws AMQException + { + //todo remove literal values to a constant file such as AMQConstants in common + ContentHeaderBody chb = getContentHeaderBody(messageId); + return chb.properties instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2; + } + + /** + * This is called when all the content has been received. + * @param publishBody + * @param contentHeaderBody + * @throws AMQException + */ + public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody, + ContentHeaderBody contentHeaderBody) + throws AMQException + { + _publishBody = publishBody; + _contentHeaderBody = contentHeaderBody; + } + + public void removeMessage(StoreContext storeContext, long messageId) throws AMQException + { + // NO OP + } + + public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException + { + // NO OP + } + + public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException + { + // NO OP + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java index de5d0f55a7..c36cc6bd7b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,11 +23,12 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.management.MBeanAttribute; import org.apache.qpid.server.management.MBeanOperation; import org.apache.qpid.server.management.MBeanOperationParameter; +import org.apache.qpid.AMQException; import javax.management.JMException; import javax.management.MBeanOperationInfo; -import javax.management.openmbean.TabularData; import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; import java.io.IOException; /** @@ -70,7 +71,7 @@ public interface ManagedQueue * @throws IOException */ @MBeanAttribute(name="QueueDepth", description="Size of messages(KB) in the queue") - Long getQueueDepth() throws IOException; + Long getQueueDepth() throws IOException, JMException; /** * Returns the total number of active subscribers to the queue. @@ -184,7 +185,7 @@ public interface ManagedQueue description="Message headers for messages in this queue within given index range. eg. from index 1 - 100") TabularData viewMessages(@MBeanOperationParameter(name="from index", description="from index")int fromIndex, @MBeanOperationParameter(name="to index", description="to index")int toIndex) - throws IOException, JMException; + throws IOException, JMException, AMQException; @MBeanOperation(name="viewMessageContent", description="The message content for given Message Id") CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java new file mode 100644 index 0000000000..a7ada2c1f8 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.store.MessageStore; + +/** + * Constructs a message handle based on the publish body, the content header and the queue to which the message + * has been routed. + * + * @author Robert Greig (robert.j.greig@jpmorgan.com) + */ +public class MessageHandleFactory +{ + + public AMQMessageHandle createMessageHandle(long messageId, MessageStore store, boolean persistent) + { + // just hardcoded for now + if (persistent) + { + return new WeakReferenceMessageHandle(store); + } + else + { + return new InMemoryMessageHandle(); + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java new file mode 100644 index 0000000000..deed18c188 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java @@ -0,0 +1,70 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * Encapsulates a publish body and a content header. In the context of the message store these are treated as a + * single unit. + */ +public class MessageMetaData +{ + private BasicPublishBody _publishBody; + + private ContentHeaderBody _contentHeaderBody; + + private int _contentChunkCount; + + public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount) + { + _contentHeaderBody = contentHeaderBody; + _publishBody = publishBody; + _contentChunkCount = contentChunkCount; + } + + public int getContentChunkCount() + { + return _contentChunkCount; + } + + public void setContentChunkCount(int contentChunkCount) + { + _contentChunkCount = contentChunkCount; + } + + public ContentHeaderBody getContentHeaderBody() + { + return _contentHeaderBody; + } + + public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) + { + _contentHeaderBody = contentHeaderBody; + } + + public BasicPublishBody getPublishBody() + { + return _publishBody; + } + + public void setPublishBody(BasicPublishBody publishBody) + { + _publishBody = publishBody; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java index 2d37b806f6..2049189e0f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java @@ -20,13 +20,8 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.protocol.AMQConstant; - -import java.util.List; +import org.apache.qpid.server.RequiredDeliveryException; /** * Signals that no consumers exist for a message at a given point in time. @@ -35,19 +30,9 @@ import java.util.List; */ public class NoConsumersException extends RequiredDeliveryException { - public NoConsumersException(String queue, - BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, - List<ContentBody> contentBodies) - { - super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies); - } - - public NoConsumersException(BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, - List<ContentBody> contentBodies) + public NoConsumersException(AMQMessage message) { - super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies); + super("Immediate delivery is not possible.", message); } public int getReplyCode() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index a5672f2b19..2dab551e07 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,11 +26,11 @@ import java.util.Queue; public interface Subscription { - void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException; + void send(AMQMessage msg, AMQQueue queue) throws AMQException; boolean isSuspended(); - void queueDeleted(AMQQueue queue); + void queueDeleted(AMQQueue queue) throws AMQException; boolean hasFilters(); @@ -44,5 +44,5 @@ public interface Subscription void close(); - boolean isBrowser(); + boolean isBrowser(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 78310e8eb3..0dc1f3b0c1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,18 +23,18 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.common.ClientProperties; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; -import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import java.util.Queue; @@ -201,7 +201,7 @@ public class SubscriptionImpl implements Subscription * @param queue * @throws AMQException */ - public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException + public void send(AMQMessage msg, AMQQueue queue) throws AMQException { if (msg != null) { @@ -211,7 +211,7 @@ public class SubscriptionImpl implements Subscription } else { - sendToConsumer(msg, queue); + sendToConsumer(channel.getStoreContext(), msg, queue); } } else @@ -220,7 +220,7 @@ public class SubscriptionImpl implements Subscription } } - private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException + private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -235,14 +235,12 @@ public class SubscriptionImpl implements Subscription { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); - AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); - - protocolSession.writeFrame(frame); + msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag); } } - private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws FailedDequeueException + private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue) + throws AMQException { try { @@ -257,7 +255,11 @@ public class SubscriptionImpl implements Subscription // the message is unacked, it will be lost. if (!_acks) { - queue.dequeue(msg); + if (_logger.isDebugEnabled()) + { + _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); + } + queue.dequeue(storeContext, msg); } synchronized(channel) { @@ -268,10 +270,7 @@ public class SubscriptionImpl implements Subscription channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); - AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); - - protocolSession.writeFrame(frame); + msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag); } } finally @@ -290,7 +289,7 @@ public class SubscriptionImpl implements Subscription * * @param queue */ - public void queueDeleted(AMQQueue queue) + public void queueDeleted(AMQQueue queue) throws AMQException { channel.queueDeleted(queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 91e720ea54..8272202571 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -204,7 +204,7 @@ class SubscriptionSet implements WeightedSubscriptionManager * * @param queue */ - public void queueDeleted(AMQQueue queue) + public void queueDeleted(AMQQueue queue) throws AMQException { for (Subscription s : _subscriptions) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java index c967ea2cde..f290452058 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; import org.apache.log4j.Logger; import java.util.LinkedList; @@ -41,11 +42,13 @@ class SynchronizedDeliveryManager implements DeliveryManager * Holds any queued messages */ private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>(); + /** * Ensures that only one asynchronous task is running for this manager at * any time. */ private final AtomicBoolean _processing = new AtomicBoolean(); + /** * The subscriptions on the queue to whom messages are delivered */ @@ -70,9 +73,9 @@ class SynchronizedDeliveryManager implements DeliveryManager _queue = queue; } - private synchronized boolean enqueue(AMQMessage msg) + private synchronized boolean enqueue(AMQMessage msg) throws AMQException { - if (msg.isImmediate()) + if (msg.getPublishBody().immediate) { return false; } @@ -90,7 +93,7 @@ class SynchronizedDeliveryManager implements DeliveryManager } } - private synchronized void startQueueing(AMQMessage msg) + private synchronized void startQueueing(AMQMessage msg) throws AMQException { _queueing = true; enqueue(msg); @@ -127,21 +130,21 @@ class SynchronizedDeliveryManager implements DeliveryManager //no-op . This DM has no PreDeliveryQueues } - public synchronized void removeAMessageFromTop() throws AMQException + public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); if (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); } } - public synchronized void clearAllMessages() throws AMQException + public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); while (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); msg = poll(); } } @@ -231,7 +234,7 @@ class SynchronizedDeliveryManager implements DeliveryManager * @throws NoConsumersException if there are no active subscribers to deliver * the message to */ - public void deliver(String name, AMQMessage msg) throws FailedDequeueException + public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException { // first check whether we are queueing, and enqueue if we are if (!enqueue(msg)) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java new file mode 100644 index 0000000000..0c50dc5207 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java @@ -0,0 +1,118 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.AMQException; + +import java.util.List; +import java.util.LinkedList; + +/** + * Contains data that is only used in AMQMessage transiently, e.g. while the content + * body fragments are arriving. + * + * Having this data stored in a separate class means that the AMQMessage class avoids + * the small overhead of numerous guaranteed-null references. + * + * @author Apache Software Foundation + */ +public class TransientMessageData +{ + /** + * Stored temporarily until the header has been received at which point it is used when + * constructing the handle + */ + private BasicPublishBody _publishBody; + + /** + * Also stored temporarily. + */ + private ContentHeaderBody _contentHeaderBody; + + /** + * Keeps a track of how many bytes we have received in body frames + */ + private long _bodyLengthReceived = 0; + + /** + * This is stored during routing, to know the queues to which this message should immediately be + * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done + * by the message handle. + */ + private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>(); + + public BasicPublishBody getPublishBody() + { + return _publishBody; + } + + public void setPublishBody(BasicPublishBody publishBody) + { + _publishBody = publishBody; + } + + public List<AMQQueue> getDestinationQueues() + { + return _destinationQueues; + } + + public void setDestinationQueues(List<AMQQueue> destinationQueues) + { + _destinationQueues = destinationQueues; + } + + public ContentHeaderBody getContentHeaderBody() + { + return _contentHeaderBody; + } + + public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) + { + _contentHeaderBody = contentHeaderBody; + } + + public long getBodyLengthReceived() + { + return _bodyLengthReceived; + } + + public void addBodyLength(int value) + { + _bodyLengthReceived += value; + } + + public boolean isAllContentReceived() throws AMQException + { + return _bodyLengthReceived == _contentHeaderBody.bodySize; + } + + public void addDestinationQueue(AMQQueue queue) + { + _destinationQueues.add(queue); + } + + public boolean isPersistent() + { + //todo remove literal values to a constant file such as AMQConstants in common + return _contentHeaderBody.properties instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java new file mode 100644 index 0000000000..2fb2bdd2e3 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -0,0 +1,190 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; + +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.List; +import java.util.LinkedList; + +/** + * @author Robert Greig (robert.j.greig@jpmorgan.com) + */ +public class WeakReferenceMessageHandle implements AMQMessageHandle +{ + private WeakReference<ContentHeaderBody> _contentHeaderBody; + + private WeakReference<BasicPublishBody> _publishBody; + + private List<WeakReference<ContentBody>> _contentBodies; + + private boolean _redelivered; + + private final MessageStore _messageStore; + + public WeakReferenceMessageHandle(MessageStore messageStore) + { + _messageStore = messageStore; + } + + public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException + { + ContentHeaderBody chb = (_contentHeaderBody != null?_contentHeaderBody.get():null); + if (chb == null) + { + MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); + chb = mmd.getContentHeaderBody(); + _contentHeaderBody = new WeakReference<ContentHeaderBody>(chb); + _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody()); + } + return chb; + } + + public int getBodyCount(long messageId) throws AMQException + { + if (_contentBodies == null) + { + MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); + int chunkCount = mmd.getContentChunkCount(); + _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount); + for (int i = 0; i < chunkCount; i++) + { + _contentBodies.add(new WeakReference<ContentBody>(null)); + } + } + return _contentBodies.size(); + } + + public long getBodySize(long messageId) throws AMQException + { + return getContentHeaderBody(messageId).bodySize; + } + + public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException + { + if (index > _contentBodies.size() - 1) + { + throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " + + (_contentBodies.size() - 1)); + } + WeakReference<ContentBody> wr = _contentBodies.get(index); + ContentBody cb = wr.get(); + if (cb == null) + { + cb = _messageStore.getContentBodyChunk(messageId, index); + _contentBodies.set(index, new WeakReference<ContentBody>(cb)); + } + return cb; + } + + /** + * Content bodies are set <i>before</i> the publish and header frames + * @param storeContext + * @param messageId + * @param contentBody + * @throws AMQException + */ + public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException + { + if (_contentBodies == null) + { + _contentBodies = new LinkedList<WeakReference<ContentBody>>(); + } + _contentBodies.add(new WeakReference<ContentBody>(contentBody)); + _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody); + } + + public BasicPublishBody getPublishBody(long messageId) throws AMQException + { + BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null); + if (bpb == null) + { + MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); + bpb = mmd.getPublishBody(); + _publishBody = new WeakReference<BasicPublishBody>(bpb); + _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody()); + } + return bpb; + } + + public boolean isRedelivered() + { + return _redelivered; + } + + public void setRedelivered(boolean redelivered) + { + _redelivered = redelivered; + } + + public boolean isPersistent(long messageId) throws AMQException + { + //todo remove literal values to a constant file such as AMQConstants in common + ContentHeaderBody chb = getContentHeaderBody(messageId); + return chb.properties instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2; + } + + /** + * This is called when all the content has been received. + * @param publishBody + * @param contentHeaderBody + * @throws AMQException + */ + public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody, + ContentHeaderBody contentHeaderBody) + throws AMQException + { + // if there are no content bodies the list will be null so we must + // create en empty list here + if (contentHeaderBody.bodySize == 0) + { + _contentBodies = new LinkedList<WeakReference<ContentBody>>(); + } + _messageStore.storeMessageMetaData(storeContext, messageId, new MessageMetaData(publishBody, contentHeaderBody, + _contentBodies.size())); + _publishBody = new WeakReference<BasicPublishBody>(publishBody); + _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody); + } + + public void removeMessage(StoreContext storeContext, long messageId) throws AMQException + { + _messageStore.removeMessage(storeContext, messageId); + } + + public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException + { + _messageStore.enqueueMessage(storeContext, queue.getName(), messageId); + } + + public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException + { + _messageStore.dequeueMessage(storeContext, queue.getName(), messageId); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 328aed81d9..edf2386314 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,10 +23,12 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.framing.ContentBody; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -43,21 +45,25 @@ public class MemoryMessageStore implements MessageStore private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity"; - protected ConcurrentMap<Long, AMQMessage> _messageMap; + protected ConcurrentMap<Long, MessageMetaData> _metaDataMap; + + protected ConcurrentMap<Long, List<ContentBody>> _contentBodyMap; private final AtomicLong _messageId = new AtomicLong(1); public void configure() { - _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table"); - _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY); + _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables"); + _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY); + _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(DEFAULT_HASHTABLE_CAPACITY); } public void configure(String base, Configuration config) { int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); - _log.info("Using capacity " + hashtableCapacity + " for hash table"); - _messageMap = new ConcurrentHashMap<Long, AMQMessage>(hashtableCapacity); + _log.info("Using capacity " + hashtableCapacity + " for hash tables"); + _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity); + _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity); } public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception @@ -67,70 +73,71 @@ public class MemoryMessageStore implements MessageStore public void close() throws Exception { - if (_messageMap != null) + if (_metaDataMap != null) { - _messageMap.clear(); - _messageMap = null; + _metaDataMap.clear(); + _metaDataMap = null; + } + if (_contentBodyMap != null) + { + _contentBodyMap.clear(); + _contentBodyMap = null; } } - public void put(AMQMessage msg) - { - _messageMap.put(msg.getMessageId(), msg); - } - - public void removeMessage(long messageId) + public void removeMessage(StoreContext context, long messageId) { if (_log.isDebugEnabled()) { _log.debug("Removing message with id " + messageId); } - _messageMap.remove(messageId); + _metaDataMap.remove(messageId); + _contentBodyMap.remove(messageId); } public void createQueue(AMQQueue queue) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + // Not required to do anything } public void removeQueue(String name) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + // Not required to do anything } - public void enqueueMessage(String name, long messageId) throws AMQException + public void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + // Not required to do anything } - public void dequeueMessage(String name, long messageId) throws AMQException + public void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + // Not required to do anything } - public void beginTran() throws AMQException + public void beginTran(StoreContext context) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + // Not required to do anything } - public void commitTran() throws AMQException + public void commitTran(StoreContext context) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + // Not required to do anything } - public void abortTran() throws AMQException + public void abortTran(StoreContext context) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + // Not required to do anything } - public boolean inTran() + public boolean inTran(StoreContext context) { return false; } public List<AMQQueue> createQueues() throws AMQException { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public long getNewMessageId() @@ -138,8 +145,33 @@ public class MemoryMessageStore implements MessageStore return _messageId.getAndIncrement(); } - public AMQMessage getMessage(long messageId) + public void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) + throws AMQException + { + List<ContentBody> bodyList = _contentBodyMap.get(messageId); + if (bodyList == null) + { + bodyList = new ArrayList<ContentBody>(); + _contentBodyMap.put(messageId, bodyList); + } + + bodyList.add(index, contentBody); + } + + public void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) + throws AMQException + { + _metaDataMap.put(messageId, messageMetaData); + } + + public MessageMetaData getMessageMetaData(long messageId) throws AMQException + { + return _metaDataMap.get(messageId); + } + + public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException { - return _messageMap.get(messageId); + List<ContentBody> bodyList = _contentBodyMap.get(messageId); + return bodyList.get(index); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 8ee1d09862..973c661c06 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,8 +22,9 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.framing.ContentBody; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; import java.util.List; @@ -37,34 +38,33 @@ public interface MessageStore * @param base the base element identifier from which all configuration items are relative. For example, if the base * element is "store", the all elements used by concrete classes will be "store.foo" etc. * @param config the apache commons configuration object + * @throws Exception if an error occurs that means the store is unable to configure itself */ void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception; /** * Called to close and cleanup any resources used by the message store. - * @throws Exception + * @throws Exception if close fails */ void close() throws Exception; - void put(AMQMessage msg) throws AMQException; - - void removeMessage(long messageId) throws AMQException; + void removeMessage(StoreContext storeContext, long messageId) throws AMQException; void createQueue(AMQQueue queue) throws AMQException; void removeQueue(String name) throws AMQException; - void enqueueMessage(String name, long messageId) throws AMQException; + void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException; - void dequeueMessage(String name, long messageId) throws AMQException; + void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException; - void beginTran() throws AMQException; + void beginTran(StoreContext context) throws AMQException; - void commitTran() throws AMQException; + void commitTran(StoreContext context) throws AMQException; - void abortTran() throws AMQException; + void abortTran(StoreContext context) throws AMQException; - boolean inTran(); + boolean inTran(StoreContext context); /** * Recreate all queues that were persisted, including re-enqueuing of existing messages @@ -78,6 +78,13 @@ public interface MessageStore * @return a message id */ long getNewMessageId(); -} + void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) throws AMQException; + void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) throws AMQException; + + MessageMetaData getMessageMetaData(long messageId) throws AMQException; + + ContentBody getContentBodyChunk(long messageId, int index) throws AMQException; + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java new file mode 100644 index 0000000000..55e5067852 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +/** + * A context that the store can use to associate with a transactional context. For example, it could store + * some kind of txn id. + * + * @author Apache Software Foundation + */ +public class StoreContext +{ + private Object _payload; + + public Object getPayload() + { + return _payload; + } + + public void setPayload(Object payload) + { + _payload = payload; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java new file mode 100644 index 0000000000..6d5d3c42f3 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java @@ -0,0 +1,91 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.store.StoreContext; + +import java.util.List; + +/** + * @author Apache Software Foundation + */ +public class CleanupMessageOperation implements TxnOp +{ + private static final Logger _log = Logger.getLogger(CleanupMessageOperation.class); + + private final AMQMessage _msg; + + private final List<RequiredDeliveryException> _returns; + + public CleanupMessageOperation(AMQMessage msg, List<RequiredDeliveryException> returns) + { + _msg = msg; + _returns = returns; + } + + public void prepare(StoreContext context) throws AMQException + { + } + + public void undoPrepare() + { + //don't need to do anything here, if the store's txn failed + //when processing prepare then the message was not stored + //or enqueued on any queues and can be discarded + } + + public void commit(StoreContext context) + { + //The routers reference can now be released. This is done + //here to ensure that it happens after the queues that + //enqueue it have incremented their counts (which as a + //memory only operation is done in the commit phase). + try + { + _msg.decrementReference(context); + } + catch (AMQException e) + { + _log.error("On commiting transaction, failed to cleanup unused message: " + e, e); + } + try + { + _msg.checkDeliveredToConsumer(); + } + catch (NoConsumersException e) + { + //TODO: store this for delivery after the commit-ok + _returns.add(e); + } + catch (AMQException e) + { + _log.error("On commiting transaction, unable to determine whether delivered to a consumer immediately: " + + e, e); + } + } + + public void rollback(StoreContext context) + { + // NO OP + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java new file mode 100644 index 0000000000..3934943665 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.StoreContext; + +/** + * @author Robert Greig (robert.j.greig@jpmorgan.com) + */ +public class DeliverMessageOperation implements TxnOp +{ + private static final Logger _logger = Logger.getLogger(DeliverMessageOperation.class); + + private final AMQMessage _msg; + + private final AMQQueue _queue; + + public DeliverMessageOperation(AMQMessage msg, AMQQueue queue) + { + _msg = msg; + _queue = queue; + _msg.incrementReference(); + } + + public void prepare(StoreContext context) throws AMQException + { + } + + public void undoPrepare() + { + } + + public void commit(StoreContext context) + { + //do the memeory part of the record() + _msg.incrementReference(); + //then process the message + try + { + _queue.process(context, _msg); + } + catch (AMQException e) + { + //TODO: is there anything else we can do here? I think not... + _logger.error("Error during commit of a queue delivery: " + e, e); + } + } + + public void rollback(StoreContext storeContext) + { + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java new file mode 100644 index 0000000000..ff6abdd58b --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -0,0 +1,148 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.ack.TxAck; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; + +import java.util.List; + +/** + * A transactional context that only supports local transactions. + */ +public class LocalTransactionalContext implements TransactionalContext +{ + private final TxnBuffer _txnBuffer; + + /** + * We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are + * consolidated into a single operation + */ + private TxAck _ackOp; + + private List<RequiredDeliveryException> _returnMessages; + + private final MessageStore _messageStore; + + private final StoreContext _storeContext; + + private boolean _inTran = false; + + public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext, + TxnBuffer txnBuffer, List<RequiredDeliveryException> returnMessages) + { + _messageStore = messageStore; + _storeContext = storeContext; + _txnBuffer = txnBuffer; + _returnMessages = returnMessages; + _txnBuffer.enlist(new StoreMessageOperation(messageStore)); + } + + public void rollback() throws AMQException + { + _txnBuffer.rollback(_storeContext); + } + + public void deliver(AMQMessage message, AMQQueue queue) throws AMQException + { + // A publication will result in the enlisting of several + // TxnOps. The first is an op that will store the message. + // Following that (and ordering is important), an op will + // be added for every queue onto which the message is + // enqueued. Finally a cleanup op will be added to decrement + // the reference associated with the routing. + + _txnBuffer.enlist(new DeliverMessageOperation(message, queue)); + _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages)); + } + + private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException + { + if (!unacknowledgedMessageMap.contains(deliveryTag)) + { + throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel"); + } + } + + public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple, + UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException + { + //check that the tag exists to give early failure + if (!multiple || deliveryTag > 0) + { + checkAck(deliveryTag, unacknowledgedMessageMap); + } + //we use a single txn op for all acks and update this op + //as new acks come in. If this is the first ack in the txn + //we will need to create and enlist the op. + if (_ackOp == null) + { + _ackOp = new TxAck(unacknowledgedMessageMap); + _txnBuffer.enlist(_ackOp); + } + // update the op to include this ack request + if (multiple && deliveryTag == 0) + { + // if have signalled to ack all, that refers only + // to all at this time + _ackOp.update(lastDeliveryTag, multiple); + } + else + { + _ackOp.update(deliveryTag, multiple); + } + } + + public void messageFullyReceived(boolean persistent) throws AMQException + { + // Not required in this transactional context + } + + public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException + { + // Not required in this transactional context + } + + public void beginTranIfNecessary() throws AMQException + { + if (!_inTran) + { + _messageStore.beginTran(_storeContext); + _inTran = true; + } + } + + public void commit() throws AMQException + { + if (_ackOp != null) + { + _ackOp.consolidate(); + //already enlisted, after commit will reset regardless of outcome + _ackOp = null; + } + + _txnBuffer.commit(_storeContext); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java new file mode 100644 index 0000000000..f09d811b50 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -0,0 +1,208 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.ack.UnacknowledgedMessage; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; + +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +/** + * @author Apache Software Foundation + */ +public class NonTransactionalContext implements TransactionalContext +{ + private static final Logger _log = Logger.getLogger(NonTransactionalContext.class); + + /** + * Channel is useful for logging + */ + private final AMQChannel _channel; + + /** + * Where to put undeliverable messages + */ + private final List<RequiredDeliveryException> _returnMessages; + + private Set<Long> _browsedAcks; + + private final MessageStore _messageStore; + + private StoreContext _storeContext; + + /** + * Whether we are in a transaction + */ + private boolean _inTran; + + public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel, + List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks) + { + _channel = channel; + _storeContext = storeContext; + _returnMessages = returnMessages; + _messageStore = messageStore; + _browsedAcks = browsedAcks; + } + + public void beginTranIfNecessary() throws AMQException + { + if (!_inTran) + { + _messageStore.beginTran(_storeContext); + _inTran = true; + } + } + + public void commit() throws AMQException + { + // Does not apply to this context + } + + public void rollback() throws AMQException + { + // Does not apply to this context + } + + public void deliver(AMQMessage message, AMQQueue queue) throws AMQException + { + try + { + message.incrementReference(); + queue.process(_storeContext, message); + //following check implements the functionality + //required by the 'immediate' flag: + message.checkDeliveredToConsumer(); + } + catch (NoConsumersException e) + { + _returnMessages.add(e); + } + } + + public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag, + boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap) + throws AMQException + { + if (multiple) + { + if (deliveryTag == 0) + { + + //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, + // tells the server to acknowledge all outstanding mesages. + _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + + unacknowledgedMessageMap.size()); + unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + if (!_browsedAcks.contains(deliveryTag)) + { + if (_log.isDebugEnabled()) + { + _log.debug("Discarding message: " + message.message.getMessageId()); + } + message.discard(_storeContext); + } + else + { + _browsedAcks.remove(deliveryTag); + } + return false; + } + + public void visitComplete() + { + unacknowledgedMessageMap.clear(); + } + }); + } + else + { + if (!unacknowledgedMessageMap.contains(deliveryTag)) + { + throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); + } + + LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); + unacknowledgedMessageMap.drainTo(acked, deliveryTag); + for (UnacknowledgedMessage msg : acked) + { + if (!_browsedAcks.contains(deliveryTag)) + { + if (_log.isDebugEnabled()) + { + _log.debug("Discarding message: " + msg.message.getMessageId()); + } + msg.discard(_storeContext); + } + else + { + _browsedAcks.remove(deliveryTag); + } + } + } + } + else + { + UnacknowledgedMessage msg; + msg = unacknowledgedMessageMap.remove(deliveryTag); + + if (msg == null) + { + _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + + _channel.getChannelId()); + throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + + _channel.getChannelId()); + } + msg.discard(_storeContext); + if (_log.isDebugEnabled()) + { + _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + + msg.message.getMessageId()); + } + } + } + + public void messageFullyReceived(boolean persistent) throws AMQException + { + if (persistent) + { + _messageStore.commitTran(_storeContext); + _inTran = false; + } + } + + public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException + { + _channel.processReturns(protocolSession); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java new file mode 100644 index 0000000000..0e4d6c2030 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; + +/** + * A transactional operation to store messages in an underlying persistent store. When this operation + * commits it will do everything to ensure that all messages are safely committed to persistent + * storage. + */ +public class StoreMessageOperation implements TxnOp +{ + private final MessageStore _messsageStore; + + public StoreMessageOperation(MessageStore messageStore) + { + _messsageStore = messageStore; + } + + public void prepare(StoreContext context) throws AMQException + { + } + + public void undoPrepare() + { + } + + public void commit(StoreContext context) throws AMQException + { + _messsageStore.commitTran(context); + } + + public void rollback(StoreContext context) throws AMQException + { + _messsageStore.abortTran(context); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java new file mode 100644 index 0000000000..bba4b98de4 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.protocol.AMQProtocolSession; + +/** + * @author Robert Greig (robert.j.greig@jpmorgan.com) + */ +public interface TransactionalContext +{ + void beginTranIfNecessary() throws AMQException; + + void commit() throws AMQException; + + void rollback() throws AMQException; + + void deliver(AMQMessage message, AMQQueue queue) throws AMQException; + + void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple, + UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException; + + void messageFullyReceived(boolean persistent) throws AMQException; + + void messageProcessed(AMQProtocolSession protocolSession) throws AMQException; +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java index 402065d5e1..069caf0ae1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,91 +22,63 @@ package org.apache.qpid.server.txn; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; import java.util.ArrayList; import java.util.List; /** * Holds a list of TxnOp instance representing transactional - * operations. + * operations. */ public class TxnBuffer { - private boolean _containsPersistentChanges = false; - private final MessageStore _store; private final List<TxnOp> _ops = new ArrayList<TxnOp>(); private static final Logger _log = Logger.getLogger(TxnBuffer.class); - public TxnBuffer(MessageStore store) - { - _store = store; - } - - public void containsPersistentChanges() + public TxnBuffer() { - _containsPersistentChanges = true; } - public void commit() throws AMQException + public void commit(StoreContext context) throws AMQException { - if (_containsPersistentChanges) + if (prepare(context)) { - _log.debug("Begin Transaction."); - _store.beginTran(); - if(prepare()) + for (TxnOp op : _ops) { - _log.debug("Transaction Succeeded"); - _store.commitTran(); - for (TxnOp op : _ops) - { - op.commit(); - } + op.commit(context); } - else - { - _log.debug("Transaction Failed"); - _store.abortTran(); - } - }else{ - if(prepare()) - { - for (TxnOp op : _ops) - { - op.commit(); - } - } } _ops.clear(); } - private boolean prepare() - { + private boolean prepare(StoreContext context) + { for (int i = 0; i < _ops.size(); i++) { TxnOp op = _ops.get(i); try { - op.prepare(); + op.prepare(context); } - catch(Exception e) + catch (Exception e) { //compensate previously prepared ops for(int j = 0; j < i; j++) { _ops.get(j).undoPrepare(); - } + } return false; } } return true; - } + } - public void rollback() throws AMQException + public void rollback(StoreContext context) throws AMQException { for (TxnOp op : _ops) { - op.rollback(); + op.rollback(context); } _ops.clear(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java index e863bab73e..919c078cf0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,7 @@ package org.apache.qpid.server.txn; import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; /** * This provides the abstraction of an individual operation within a @@ -29,14 +30,14 @@ import org.apache.qpid.AMQException; public interface TxnOp { /** - * Do the part of the operation that updates persistent state + * Do the part of the operation that updates persistent state */ - public void prepare() throws AMQException; + public void prepare(StoreContext context) throws AMQException; /** * Complete the operation started by prepare. Can now update in * memory state or make netork transfers. */ - public void commit(); + public void commit(StoreContext context) throws AMQException; /** * This is not the same as rollback. Unfortunately the use of an * in memory reference count as a locking mechanism and a test for @@ -50,5 +51,5 @@ public interface TxnOp /** * Rolls back the operation. */ - public void rollback(); + public void rollback(StoreContext context) throws AMQException; } |