diff options
author | Rupert Smith <rupertlssmith@apache.org> | 2007-07-02 14:17:45 +0000 |
---|---|---|
committer | Rupert Smith <rupertlssmith@apache.org> | 2007-07-02 14:17:45 +0000 |
commit | e4f9a8d7e300a4267b1b61a8c6839f08df648e6b (patch) | |
tree | 31627d2c8ecde567da68d8008b9869c6f083be34 | |
parent | 3a614544977df9c1443edb653b939062c6325c5c (diff) | |
download | qpid-python-e4f9a8d7e300a4267b1b61a8c6839f08df648e6b.tar.gz |
Added some documentation.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@552499 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 350 insertions, 228 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 87c7db46e4..e77e47b69a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -20,45 +20,44 @@ */ package org.apache.qpid.server.queue; +import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.registry.ApplicationRegistry; - -/** Combines the information that make up a deliverable message into a more manageable form. */ -import org.apache.log4j.Logger; - -import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Set; -import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -/** Combines the information that make up a deliverable message into a more manageable form. */ +/** + * A deliverable message. + */ public class AMQMessage { + /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); - /** Used in clustering */ + /** Used in clustering. @todo What for? */ private Set<Object> _tokens; - /** Only use in clustering - should ideally be removed? */ + /** Only use in clustering. @todo What for? */ private AMQProtocolSession _publisher; private final Long _messageId; @@ -67,33 +66,27 @@ public class AMQMessage private AMQMessageHandle _messageHandle; - // TODO: ideally this should be able to go into the transient message date - check this! (RG) + /** Holds the transactional context in which this message is being processed. */ private TransactionalContext _txnContext; /** - * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for - * messages published with the 'immediate' flag. + * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality + * for messages published with the 'immediate' flag. */ private boolean _deliveredToConsumer; - /** - * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the - * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body - * removed from the store. - */ + + /** Flag to indicate that this message requires 'immediate' delivery. */ private boolean _immediate; - // private Subscription _takenBySubcription; - // private AtomicBoolean _taken = new AtomicBoolean(false); + // private Subscription _takenBySubcription; + // private AtomicBoolean _taken = new AtomicBoolean(false); private TransientMessageData _transientMessageData = new TransientMessageData(); - private Set<Subscription> _rejectedBy = null; - private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>(); private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(); - private final int hashcode = System.identityHashCode(this); private long _expiration; @@ -104,8 +97,10 @@ public class AMQMessage public void setExpiration() { - long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration(); - long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp(); + long expiration = + ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration(); + long timestamp = + ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp(); if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false)) { @@ -118,10 +113,10 @@ public class AMQMessage { if (timestamp != 0L) { - //todo perhaps use arrival time + // todo perhaps use arrival time long diff = (System.currentTimeMillis() - timestamp); - if (diff > 1000L || diff < 1000L) + if ((diff > 1000L) || (diff < 1000L)) { _expiration = expiration + diff; } @@ -152,11 +147,12 @@ public class AMQMessage { try { - return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1; + return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1); } catch (AMQException e) { _log.error("Unable to get body count: " + e, e); + return false; } } @@ -166,7 +162,10 @@ public class AMQMessage try { - AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index)); + AMQBody cb = + getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), + _messageId, ++_index)); + return new AMQFrame(_channel, cb); } catch (AMQException e) @@ -202,11 +201,12 @@ public class AMQMessage { try { - return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1; + return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1); } catch (AMQException e) { _log.error("Error getting body count: " + e, e); + return false; } } @@ -229,8 +229,7 @@ public class AMQMessage } } - public AMQMessage(Long messageId, MessagePublishInfo info, - TransactionalContext txnContext) + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext) { _messageId = messageId; _txnContext = txnContext; @@ -250,7 +249,8 @@ public class AMQMessage * * @throws AMQException */ - public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException + public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) + throws AMQException { _messageId = messageId; _messageHandle = factory.createMessageHandle(messageId, store, true); @@ -266,8 +266,8 @@ public class AMQMessage * @param txnContext * @param contentHeader */ - public AMQMessage(Long messageId, MessagePublishInfo info, - TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, + ContentHeaderBody contentHeader) throws AMQException { this(messageId, info, txnContext); setContentHeaderBody(contentHeader); @@ -285,11 +285,9 @@ public class AMQMessage * * @throws AMQException */ - public AMQMessage(Long messageId, MessagePublishInfo info, - TransactionalContext txnContext, - ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, - List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext, - MessageHandleFactory messageHandleFactory) throws AMQException + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, + ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies, + MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException { this(messageId, info, txnContext, contentHeader); _transientMessageData.setDestinationQueues(destinationQueues); @@ -331,13 +329,13 @@ public class AMQMessage } } - public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) - throws AMQException + public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException { _transientMessageData.setContentHeaderBody(contentHeaderBody); } - public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) throws AMQException + public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) + throws AMQException { final boolean persistent = isPersistent(); _messageHandle = factory.createMessageHandle(_messageId, store, persistent); @@ -368,6 +366,7 @@ public class AMQMessage if (allContentReceived) { deliver(storeContext); + return true; } else @@ -392,7 +391,8 @@ public class AMQMessage */ public AMQMessage takeReference() { - incrementReference();// _referenceCount.incrementAndGet(); + incrementReference(); // _referenceCount.incrementAndGet(); + return this; } @@ -400,10 +400,10 @@ public class AMQMessage protected void incrementReference() { _referenceCount.incrementAndGet(); -// if (_log.isDebugEnabled()) -// { -// _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); -// } + // if (_log.isDebugEnabled()) + // { + // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + // } } /** @@ -427,10 +427,10 @@ public class AMQMessage { try { -// if (_log.isDebugEnabled()) -// { -// _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); -// } + // if (_log.isDebugEnabled()) + // { + // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + // } // must check if the handle is null since there may be cases where we decide to throw away a message // and the handle has not yet been constructed @@ -441,7 +441,7 @@ public class AMQMessage } catch (AMQException e) { - //to maintain consistency, we revert the count + // to maintain consistency, we revert the count incrementReference(); throw new MessageCleanupException(_messageId, e); } @@ -450,7 +450,8 @@ public class AMQMessage { if (count < 0) { - throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0."); + throw new MessageCleanupException("Reference count for message id " + debugIdentity() + + " has gone below 0."); } } } @@ -477,7 +478,7 @@ public class AMQMessage public boolean isTaken(AMQQueue queue) { - //return _taken.get(); + // return _taken.get(); synchronized (this) { @@ -494,15 +495,15 @@ public class AMQMessage public boolean taken(AMQQueue queue, Subscription sub) { -// if (_taken.getAndSet(true)) -// { -// return true; -// } -// else -// { -// _takenBySubcription = sub; -// return false; -// } + // if (_taken.getAndSet(true)) + // { + // return true; + // } + // else + // { + // _takenBySubcription = sub; + // return false; + // } synchronized (this) { @@ -520,6 +521,7 @@ public class AMQMessage { _takenMap.put(queue, taken); _takenBySubcriptionMap.put(queue, sub); + return false; } } @@ -532,9 +534,8 @@ public class AMQMessage _log.trace("Releasing Message:" + debugIdentity()); } -// _taken.set(false); -// _takenBySubcription = null; - + // _taken.set(false); + // _takenBySubcription = null; synchronized (this) { @@ -568,6 +569,7 @@ public class AMQMessage else { _tokens.add(token); + return false; } } @@ -629,6 +631,7 @@ public class AMQMessage { pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId); } + return pb; } @@ -659,7 +662,7 @@ public class AMQMessage */ public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException { - //note: If the storecontext isn't need then we can remove the getChannel() from Subscription. + // note: If the storecontext isn't need then we can remove the getChannel() from Subscription. if (_expiration != 0L) { @@ -668,6 +671,7 @@ public class AMQMessage if (now > _expiration) { dequeue(storecontext, queue); + return true; } } @@ -690,12 +694,13 @@ public class AMQMessage { _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues); } + try { // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks - _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(), - _transientMessageData.getContentHeaderBody()); + _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, + _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody()); // we then allow the transactional context to do something with the message content // now that it has all been received, before we attempt delivery @@ -705,9 +710,9 @@ public class AMQMessage for (AMQQueue q : destinationQueues) { - //Increment the references to this message for each queue delivery. + // Increment the references to this message for each queue delivery. incrementReference(); - //normal deliver so add this message at the end. + // normal deliver so add this message at the end. _txnContext.deliver(this, q, false); } } @@ -719,182 +724,181 @@ public class AMQMessage } } -/* - public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag); - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); - - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); - if (bodyCount == 0) + /* + public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); + ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag); + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); + + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); + if (bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + + protocolSession.writeFrame(compositeBlock); + } + else + { + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for (int i = 1; i < bodyCount; i++) + { + cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); + protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + } + + + } + - protocolSession.writeFrame(compositeBlock); } - else + + public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException { + ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize); + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); + if (bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + protocolSession.writeFrame(compositeBlock); + } + else + { + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for (int i = 1; i < bodyCount; i++) + { + cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); + protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + } - AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); - // - // Now start writing out the other content bodies - // - for (int i = 1; i < bodyCount; i++) - { - cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); - protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); } } - } + private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + MessagePublishInfo pb = getMessagePublishInfo(); + AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag, + deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(), + pb.getRoutingKey()); + ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? + deliverFrame.writePayload(buf); + buf.flip(); + return buf; + } - public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException - { - ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize); - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); + private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) + throws AMQException + { + MessagePublishInfo pb = getMessagePublishInfo(); + AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + deliveryTag, pb.getExchange(), + queueSize, + _messageHandle.isRedelivered(), + pb.getRoutingKey()); + ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem? + getOkFrame.writePayload(buf); + buf.flip(); + return buf; + } - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); - if (bodyCount == 0) + private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); - protocolSession.writeFrame(compositeBlock); + AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + getMessagePublishInfo().getExchange(), + replyCode, replyText, + getMessagePublishInfo().getRoutingKey()); + ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? + returnFrame.writePayload(buf); + buf.flip(); + return buf; } - else + + public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) + throws AMQException { + ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText); + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); + Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId); // // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); - - AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); + if (bodyFrameIterator.hasNext()) + { + AMQDataBlock firstContentBody = bodyFrameIterator.next(); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + } + else + { + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, + new AMQDataBlock[]{contentHeader}); + protocolSession.writeFrame(compositeBlock); + } // // Now start writing out the other content bodies + // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded // - for (int i = 1; i < bodyCount; i++) + while (bodyFrameIterator.hasNext()) { - cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); - protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + protocolSession.writeFrame(bodyFrameIterator.next()); } - - } - - - } - - - private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - MessagePublishInfo pb = getMessagePublishInfo(); - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag, - deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(), - pb.getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? - deliverFrame.writePayload(buf); - buf.flip(); - return buf; - } - - private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) - throws AMQException - { - MessagePublishInfo pb = getMessagePublishInfo(); - AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - deliveryTag, pb.getExchange(), - queueSize, - _messageHandle.isRedelivered(), - pb.getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem? - getOkFrame.writePayload(buf); - buf.flip(); - return buf; - } - - private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException - { - AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - getMessagePublishInfo().getExchange(), - replyCode, replyText, - getMessagePublishInfo().getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? - returnFrame.writePayload(buf); - buf.flip(); - return buf; - } - - public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) - throws AMQException - { - ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText); - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); - - Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId); - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - if (bodyFrameIterator.hasNext()) - { - AMQDataBlock firstContentBody = bodyFrameIterator.next(); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); - } - else - { - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, - new AMQDataBlock[]{contentHeader}); - protocolSession.writeFrame(compositeBlock); - } - - // - // Now start writing out the other content bodies - // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded - // - while (bodyFrameIterator.hasNext()) - { - protocolSession.writeFrame(bodyFrameIterator.next()); - } - } -*/ + */ public AMQMessageHandle getMessageHandle() { return _messageHandle; } - public long getSize() { try @@ -906,12 +910,12 @@ public class AMQMessage catch (AMQException e) { _log.error(e.toString(), e); + return 0; } } - public void restoreTransientMessageData() throws AMQException { TransientMessageData transientMessageData = new TransientMessageData(); @@ -921,25 +925,23 @@ public class AMQMessage _transientMessageData = transientMessageData; } - public void clearTransientMessageData() { _transientMessageData = null; } - public String toString() { -// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + -// _taken + " by :" + _takenBySubcription; + // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + + // _taken + " by :" + _takenBySubcription; - return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + - _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); } public Subscription getDeliveredSubscription(AMQQueue queue) { -// return _takenBySubcription; + // return _takenBySubcription; synchronized (this) { return _takenBySubcriptionMap.get(queue); @@ -967,7 +969,7 @@ public class AMQMessage { boolean rejected = _rejectedBy != null; - if (rejected) // We have subscriptions that rejected this message + if (rejected) // We have subscriptions that rejected this message { return _rejectedBy.contains(subscription); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java index 88451e2fca..fee25c07df 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java @@ -28,24 +28,144 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoreContext; /** - * @author Robert Greig (robert.j.greig@jpmorgan.com) + * TransactionalContext provides a context in which transactional operations on {@link AMQMessage}s are performed. + * Different levels of transactional support for the delivery of messages may be provided by different implementations + * of this interface. + * + * <p/>The fundamental transactional operations that can be performed on a message queue are 'enqueue' and 'dequeue'. + * In this interface, these have been recast as the {@link #messageFullyReceived} and {@link #acknowledgeMessage} + * operations. This interface essentially provides a way to make enqueueing and dequeuing transactional. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities + * <tr><td> Explicitly accept a transaction start notification. + * <tr><td> Commit all pending operations in a transaction. + * <tr><td> Rollback all pending operations in a transaction. + * <tr><td> Deliver a message to a queue as part of a transaction. + * <tr><td> Redeliver a message to a queue as part of a transaction. + * <tr><td> Mark a message as acknowledged as part of a transaction. + * <tr><td> Accept notification that a message has been completely received as part of a transaction. + * <tr><td> Accept notification that a message has been fully processed as part of a transaction. + * <tr><td> Associate a message store context with this transaction context. + * </table> + * + * @todo The 'fullyReceived' and 'messageProcessed' events sit uncomfortably in the responsibilities of a transactional + * context. They are non-transactional operations, used to trigger other side-effects. Consider moving them + * somewhere else, a seperate interface for example. + * + * @todo This transactional context could be written as a wrapper extension to a Queue implementation, that provides + * transactional management of the enqueue and dequeue operations, with added commit/rollback methods. Any + * queue implementation could be made transactional by wrapping it as a transactional queue. This would mean + * that the enqueue/dequeue operations do not need to be recast as deliver/acknowledge operations, which may be + * conceptually neater. + * + * For example: + * <pre> + * public interface Transactional + * { + * public void commit(); + * public void rollback(); + * } + * + * public interface TransactionalQueue<E> extends Transactional, SizeableQueue<E> + * {} + * + * public class Queues + * { + * ... + * // For transactional messaging, take a transactional view onto the queue. + * public static <E> TransactionalQueue<E> getTransactionalQueue(SizeableQueue<E> queue) { ... } + * + * // For non-transactional messaging, take a non-transactional view onto the queue. + * public static <E> TransactionalQueue<E> getNonTransactionalQueue(SizeableQueue<E> queue) { ... } + * } + * </pre> */ public interface TransactionalContext { + /** + * Explicitly begins the transaction, if it has not already been started. {@link #commit} or {@link #rollback} + * should automatically begin the next transaction in the chain. + * + * @throws AMQException If the transaction cannot be started for any reason. + */ void beginTranIfNecessary() throws AMQException; + /** + * Makes all pending operations on the transaction permanent and visible. + * + * @throws AMQException If the transaction cannot be committed for any reason. + */ void commit() throws AMQException; + /** + * Erases all pending operations on the transaction. + * + * @throws AMQException If the transaction cannot be committed for any reason. + */ void rollback() throws AMQException; + /** + * Delivers the specified message to the specified queue. A 'deliverFirst' flag may be set if the message is a + * redelivery, and should be placed on the front of the queue. + * + * <p/>This is an 'enqueue' operation. + * + * @param message The message to deliver. + * @param queue The queue to deliver the message to. + * @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt> + * for normal FIFO message ordering. + * + * @throws AMQException If the message cannot be delivered for any reason. + */ void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException; + /** + * Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by + * setting the 'multiple' flag. It is also possible for the acknowledged message id to be zero, when the 'multiple' + * flag is set, in which case an acknowledgement up to the latest delivered message should be done. + * + * <p/>This is a 'dequeue' operation. + * + * @param deliveryTag The id of the message to acknowledge, or zero, if using multiple acknowledgement + * up to the latest message. + * @param lastDeliveryTag The latest message delivered. + * @param multiple <tt>true</tt> if all message ids up the acknowledged one or latest delivered, are + * to be acknowledged, <tt>false</tt> otherwise. + * @param unacknowledgedMessageMap The unacknowledged messages in the transaction, to remove the acknowledged message + * from. + * + * @throws AMQException If the message cannot be acknowledged for any reason. + */ void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple, - UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException; + UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException; + /** + * Notifies the transactional context that a message has been fully received. The actual message that was received + * is not specified. This event may be used to trigger a process related to the receipt of the message, for example, + * flushing its data to disk. + * + * @param persistent <tt>true</tt> if the received message is persistent, <tt>false</tt> otherwise. + * + * @throws AMQException If the fully received event cannot be processed for any reason. + */ void messageFullyReceived(boolean persistent) throws AMQException; + /** + * Notifies the transactional context that a message has been delivered, succesfully or otherwise. The actual + * message that was delivered is not specified. This event may be used to trigger a process related to the + * outcome of the delivery of the message, for example, cleaning up failed deliveries. + * + * @param protocolSession The protocol session of the deliverable message. + * + * @throws AMQException If the message processed event cannot be handled for any reason. + */ void messageProcessed(AMQProtocolSession protocolSession) throws AMQException; + /** + * Gets the message store context associated with this transactional context. + * + * @return The message store context associated with this transactional context. + */ StoreContext getStoreContext(); } diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 272a7a4c8b..720fc8cf17 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -201,8 +201,8 @@ <TQC-Qpid-02>-n TQC-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=1 rate=1000 maxPending=1000000 </TQC-Qpid-02> <TQC-Qpid-03>-n TQC-Qpid-03 -d10M -s[1000] -c[10] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=1000000 </TQC-Qpid-03> <TQC-Qpid-04>-n TQC-Qpid-04 -d10M -s[1000] -c[10] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=1000000 </TQC-Qpid-04> - <TQC-Qpid-05>-n TQC-Qpid-05 -d10M -s[1000] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-05> - <TQC-Qpid-06>-n TQC-Qpid-06 -d10M -s[1000] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-06> + <TQC-Qpid-05>-n TQC-Qpid-05 -d10M -s[1000] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-05> + <TQC-Qpid-06>-n TQC-Qpid-06 -d10M -s[1000] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-06> <TQM-Qpid-01-512b>-n TQM-Qpid-01-512b -d10M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true commitBatchSize=10 batchSize=1000 messageSize=512 destinationsCount=1 rate=0 maxPending=20000000</TQM-Qpid-01-512b> <TQM-Qpid-02-512b>-n TQM-Qpid-02-512b -d10M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=10 batchSize=1000 messageSize=512 destinationsCount=1 rate=0 maxPending=20000000</TQM-Qpid-02-512b> |