summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java520
1 files changed, 520 insertions, 0 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
new file mode 100644
index 0000000000..afab90dcc4
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -0,0 +1,520 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.log4j.Logger;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Combines the information that make up a deliverable message into a more manageable form.
+ */
+public class AMQMessage
+{
+ private static final Logger _log = Logger.getLogger(AMQMessage.class);
+
+ /**
+ * Used in clustering
+ */
+ private final Set<Object> _tokens = new HashSet<Object>();
+
+ /**
+ * Used in clustering
+ * TODO need to get rid of this
+ */
+ private AMQProtocolSession _publisher;
+
+ private final long _messageId;
+
+ private final AtomicInteger _referenceCount = new AtomicInteger(1);
+
+ private AMQMessageHandle _messageHandle;
+
+ /**
+ * Stored temporarily until the header has been received at which point it is used when
+ * constructing the handle
+ */
+ private BasicPublishBody _publishBody;
+
+ /**
+ * Also stored temporarily.
+ */
+ private ContentHeaderBody _contentHeaderBody;
+
+ /**
+ * Keeps a track of how many bytes we have received in body frames
+ */
+ private long _bodyLengthReceived = 0;
+
+ private final TransactionalContext _txnContext;
+
+ /**
+ * Flag to indicate whether message has been delivered to a
+ * consumer. Used in implementing return functionality for
+ * messages published with the 'immediate' flag.
+ */
+ private boolean _deliveredToConsumer;
+
+ /**
+ * This is stored during routing, to know the queues to which this message should immediately be
+ * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
+ * by the message handle.
+ */
+ private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+
+ /**
+ * Used to iterate through all the body frames associated with this message. Will not
+ * keep all the data in memory therefore is memory-efficient.
+ */
+ private class BodyFrameIterator implements Iterator<AMQDataBlock>
+ {
+ private int _channel;
+
+ private int _index = -1;
+
+ private BodyFrameIterator(int channel)
+ {
+ _channel = channel;
+ }
+
+ public boolean hasNext()
+ {
+ return _index < _messageHandle.getBodyCount() - 1;
+ }
+
+ public AMQDataBlock next()
+ {
+ try
+ {
+ ContentBody cb = _messageHandle.getContentBody(_messageId, ++_index);
+ return ContentBody.createAMQFrame(_channel, cb);
+ }
+ catch (AMQException e)
+ {
+ // have no choice but to throw a runtime exception
+ throw new RuntimeException("Error getting content body: " + e, e);
+ }
+
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class BodyContentIterator implements Iterator<ContentBody>
+ {
+
+ private int _index = -1;
+
+ public boolean hasNext()
+ {
+ return _index < _messageHandle.getBodyCount() - 1;
+ }
+
+ public ContentBody next()
+ {
+ try
+ {
+ return _messageHandle.getContentBody(_messageId, ++_index);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Error getting content body: " + e, e);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public AMQMessage(long messageId, BasicPublishBody publishBody, TransactionalContext txnContext)
+ {
+ _messageId = messageId;
+ _txnContext = txnContext;
+ _publishBody = publishBody;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message created with id " + messageId);
+ }
+ }
+
+ protected AMQMessage(AMQMessage msg) throws AMQException
+ {
+ _publisher = msg._publisher;
+ _messageId = msg._messageId;
+ _messageHandle = msg._messageHandle;
+ _txnContext = msg._txnContext;
+ _deliveredToConsumer = msg._deliveredToConsumer;
+ }
+
+ public Iterator<AMQDataBlock> getBodyFrameIterator(int channel)
+ {
+ return new BodyFrameIterator(channel);
+ }
+
+ public Iterator<ContentBody> getContentBodyIterator()
+ {
+ return new BodyContentIterator();
+ }
+
+ public ContentHeaderBody getContentHeaderBody() throws AMQException
+ {
+ return _messageHandle.getContentHeaderBody(_messageId);
+ }
+
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public void routingComplete(MessageStore store, MessageHandleFactory factory) throws AMQException
+ {
+ final boolean persistent = isPersistent();
+ _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
+ if (persistent)
+ {
+ _txnContext.beginTranIfNecessary();
+ }
+
+ // enqueuing the messages ensure that if required the destinations are recorded to a
+ // persistent store
+ for (AMQQueue q : _destinationQueues)
+ {
+ _messageHandle.enqueue(_messageId, q);
+ }
+
+ if (_contentHeaderBody.bodySize == 0)
+ {
+ deliver();
+ }
+ }
+
+ public boolean addContentBodyFrame(ContentBody contentBody) throws AMQException
+ {
+ _bodyLengthReceived += contentBody.getSize();
+ _messageHandle.addContentBodyFrame(_messageId, contentBody);
+ if (isAllContentReceived())
+ {
+ deliver();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public boolean isAllContentReceived() throws AMQException
+ {
+ return _bodyLengthReceived == _contentHeaderBody.bodySize;
+ }
+
+ public long getMessageId()
+ {
+ return _messageId;
+ }
+
+ /**
+ * Threadsafe. Increment the reference count on the message.
+ */
+ public void incrementReference()
+ {
+ _referenceCount.incrementAndGet();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount);
+ }
+ }
+
+ /**
+ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+ * message store.
+ *
+ * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
+ */
+ public void decrementReference() throws MessageCleanupException
+ {
+ // note that the operation of decrementing the reference count and then removing the message does not
+ // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+ // the message has been passed to all queues. i.e. we are
+ // not relying on the all the increments having taken place before the delivery manager decrements.
+ if (_referenceCount.decrementAndGet() == 0)
+ {
+ try
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ref count on message " + _messageId + " is zero; removing message");
+ }
+ _messageHandle.removeMessage(_messageId);
+ }
+ catch (AMQException e)
+ {
+ //to maintain consistency, we revert the count
+ incrementReference();
+ throw new MessageCleanupException(_messageId, e);
+ }
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId);
+ if (_referenceCount.get() < 0)
+ {
+ Thread.dumpStack();
+ }
+ }
+ }
+ }
+
+ public void setPublisher(AMQProtocolSession publisher)
+ {
+ _publisher = publisher;
+ }
+
+ public AMQProtocolSession getPublisher()
+ {
+ return _publisher;
+ }
+
+ public boolean checkToken(Object token)
+ {
+ if (_tokens.contains(token))
+ {
+ return true;
+ }
+ else
+ {
+ _tokens.add(token);
+ return false;
+ }
+ }
+
+ /**
+ * Registers a queue to which this message is to be delivered. This is
+ * called from the exchange when it is routing the message. This will be called before any content bodies have
+ * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria.
+ *
+ * @param queue the queue
+ * @throws org.apache.qpid.AMQException if there is an error enqueuing the message
+ */
+ public void enqueue(AMQQueue queue) throws AMQException
+ {
+ _destinationQueues.add(queue);
+ }
+
+ public void dequeue(AMQQueue queue) throws AMQException
+ {
+ _messageHandle.dequeue(_messageId, queue);
+ }
+
+ public boolean isPersistent() throws AMQException
+ {
+ if (_contentHeaderBody != null)
+ {
+ //todo remove literal values to a constant file such as AMQConstants in common
+ return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ }
+ else
+ {
+ return _messageHandle.isPersistent(_messageId);
+ }
+ }
+
+ /**
+ * Called to enforce the 'immediate' flag.
+ *
+ * @throws NoConsumersException if the message is marked for
+ * immediate delivery but has not been marked as delivered to a
+ * consumer
+ */
+ public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
+ {
+ BasicPublishBody pb = getPublishBody();
+ if (pb.immediate && !_deliveredToConsumer)
+ {
+ throw new NoConsumersException(this);
+ }
+ }
+
+ public BasicPublishBody getPublishBody() throws AMQException
+ {
+ BasicPublishBody pb;
+ if (_publishBody != null)
+ {
+ pb = _publishBody;
+ }
+ else
+ {
+ pb = _messageHandle.getPublishBody(_messageId);
+ }
+ return pb;
+ }
+
+ /**
+ * Called when this message is delivered to a consumer. (used to
+ * implement the 'immediate' flag functionality).
+ */
+ public void setDeliveredToConsumer()
+ {
+ _deliveredToConsumer = true;
+ }
+
+ /*public void registerQueue(AMQQueue queue)
+ {
+ _destinationQueues.add(queue);
+ } */
+
+ private void deliver() throws AMQException
+ {
+ // first we allow the handle to know that the message has been fully received. This is useful if it is
+ // maintaining any calculated values based on content chunks
+ try
+ {
+ _messageHandle.setPublishAndContentHeaderBody(_messageId, _publishBody, _contentHeaderBody);
+ _publishBody = null;
+ _contentHeaderBody = null;
+
+ // we then allow the transactional context to do something with the message content
+ // now that it has all been received, before we attempt delivery
+ _txnContext.messageFullyReceived(isPersistent());
+ for (AMQQueue q : _destinationQueues)
+ {
+ _txnContext.deliver(this, q);
+ }
+ }
+ finally
+ {
+ _destinationQueues.clear();
+ _destinationQueues = null;
+ decrementReference();
+ }
+ }
+
+ public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, String consumerTag)
+ throws AMQException
+ {
+ ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver,
+ new AMQDataBlock[]{contentHeader});
+ protocolSession.writeFrame(compositeBlock);
+ }
+
+ //
+ // Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ //
+ while (bodyFrameIterator.hasNext())
+ {
+ protocolSession.writeFrame(bodyFrameIterator.next());
+ }
+
+ }
+
+ private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, String consumerTag)
+ throws AMQException
+ {
+ BasicPublishBody pb = getPublishBody();
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, consumerTag,
+ deliveryTag, false, pb.exchange,
+ pb.routingKey);
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText)
+ {
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, replyCode, replyText, _publishBody.exchange,
+ _publishBody.routingKey);
+ ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+ returnFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, String replyText)
+ throws AMQException
+ {
+ ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText);
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+ new AMQDataBlock[]{contentHeader});
+ protocolSession.writeFrame(compositeBlock);
+ }
+
+ //
+ // Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ //
+ while (bodyFrameIterator.hasNext())
+ {
+ protocolSession.writeFrame(bodyFrameIterator.next());
+ }
+ }
+}