diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-06 12:41:25 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-06 12:41:25 +0000 |
commit | be821e59060035f7041c35353aafbf576e92b55b (patch) | |
tree | 04cd382ff4a86c9eb1fe35b6db1ec8e638398679 | |
parent | a14806948d841c98842a6b45aaf36250c13ef42b (diff) | |
download | qpid-python-be821e59060035f7041c35353aafbf576e92b55b.tar.gz |
Improvements to support message recovery on broker startup
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@483058 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 169 insertions, 53 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index f2cd0d13cd..12ae8330a6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -54,23 +54,8 @@ public class AMQMessage private AMQMessageHandle _messageHandle; - /** - * Stored temporarily until the header has been received at which point it is used when - * constructing the handle - */ - private BasicPublishBody _publishBody; - - /** - * Also stored temporarily. - */ - private ContentHeaderBody _contentHeaderBody; - - /** - * Keeps a track of how many bytes we have received in body frames - */ - private long _bodyLengthReceived = 0; - - private final TransactionalContext _txnContext; + // TODO: ideally this should be able to go into the transient message date - check this! (RG) + private TransactionalContext _txnContext; /** * Flag to indicate whether message has been delivered to a @@ -79,12 +64,7 @@ public class AMQMessage */ private boolean _deliveredToConsumer; - /** - * This is stored during routing, to know the queues to which this message should immediately be - * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done - * by the message handle. - */ - private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>(); + private TransientMessageData _transientMessageData = new TransientMessageData(); /** * Used to iterate through all the body frames associated with this message. Will not @@ -160,7 +140,7 @@ public class AMQMessage { _messageId = messageId; _txnContext = txnContext; - _publishBody = publishBody; + _transientMessageData.setPublishBody(publishBody); if (_log.isDebugEnabled()) { _log.debug("Message created with id " + messageId); @@ -168,6 +148,22 @@ public class AMQMessage } /** + * Used when recovering, i.e. when the message store is creating references to messages. + * In that case, the normal enqueue/routingComplete is not done since the recovery process + * is responsible for routing the messages to queues. + * @param messageId + * @param store + * @param factory + * @throws AMQException + */ + public AMQMessage(long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException + { + _messageId = messageId; + _messageHandle = factory.createMessageHandle(messageId, store, true); + _transientMessageData = null; + } + + /** * Used in testing only. This allows the passing of the content header immediately * on construction. * @param messageId @@ -200,7 +196,7 @@ public class AMQMessage MessageHandleFactory messageHandleFactory) throws AMQException { this(messageId, publishBody, txnContext, contentHeader); - _destinationQueues = destinationQueues; + _transientMessageData.setDestinationQueues(destinationQueues); routingComplete(messageStore, messageHandleFactory); for (ContentBody cb : contentBodies) { @@ -214,6 +210,7 @@ public class AMQMessage _messageHandle = msg._messageHandle; _txnContext = msg._txnContext; _deliveredToConsumer = msg._deliveredToConsumer; + _transientMessageData = msg._transientMessageData; } public Iterator<AMQDataBlock> getBodyFrameIterator(int channel) @@ -228,9 +225,9 @@ public class AMQMessage public ContentHeaderBody getContentHeaderBody() throws AMQException { - if (_contentHeaderBody != null) + if (_transientMessageData != null) { - return _contentHeaderBody; + return _transientMessageData.getContentHeaderBody(); } else { @@ -241,7 +238,7 @@ public class AMQMessage public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException { - _contentHeaderBody = contentHeaderBody; + _transientMessageData.setContentHeaderBody(contentHeaderBody); } public void routingComplete(MessageStore store, MessageHandleFactory factory) throws AMQException @@ -255,12 +252,12 @@ public class AMQMessage // enqueuing the messages ensure that if required the destinations are recorded to a // persistent store - for (AMQQueue q : _destinationQueues) + for (AMQQueue q : _transientMessageData.getDestinationQueues()) { _messageHandle.enqueue(_messageId, q); } - if (_contentHeaderBody.bodySize == 0) + if (_transientMessageData.getContentHeaderBody().bodySize == 0) { deliver(); } @@ -268,7 +265,7 @@ public class AMQMessage public boolean addContentBodyFrame(ContentBody contentBody) throws AMQException { - _bodyLengthReceived += contentBody.getSize(); + _transientMessageData.addBodyLength(contentBody.getSize()); _messageHandle.addContentBodyFrame(_messageId, contentBody); if (isAllContentReceived()) { @@ -283,7 +280,7 @@ public class AMQMessage public boolean isAllContentReceived() throws AMQException { - return _bodyLengthReceived == _contentHeaderBody.bodySize; + return _transientMessageData.isAllContentReceived(); } public long getMessageId() @@ -384,7 +381,7 @@ public class AMQMessage */ public void enqueue(AMQQueue queue) throws AMQException { - _destinationQueues.add(queue); + _transientMessageData.addDestinationQueue(queue); } public void dequeue(AMQQueue queue) throws AMQException @@ -394,11 +391,9 @@ public class AMQMessage public boolean isPersistent() throws AMQException { - if (_contentHeaderBody != null) + if (_transientMessageData != null) { - //todo remove literal values to a constant file such as AMQConstants in common - return _contentHeaderBody.properties instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; + return _transientMessageData.isPersistent(); } else { @@ -425,9 +420,9 @@ public class AMQMessage public BasicPublishBody getPublishBody() throws AMQException { BasicPublishBody pb; - if (_publishBody != null) + if (_transientMessageData != null) { - pb = _publishBody; + pb = _transientMessageData.getPublishBody(); } else { @@ -452,26 +447,30 @@ public class AMQMessage private void deliver() throws AMQException { - // first we allow the handle to know that the message has been fully received. This is useful if it is - // maintaining any calculated values based on content chunks + // we get a reference to the destination queues now so that we can clear the + // transient message data as quickly as possible + List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues(); try { - _messageHandle.setPublishAndContentHeaderBody(_messageId, _publishBody, _contentHeaderBody); - _publishBody = null; - _contentHeaderBody = null; + // first we allow the handle to know that the message has been fully received. This is useful if it is + // maintaining any calculated values based on content chunks + _messageHandle.setPublishAndContentHeaderBody(_messageId, _transientMessageData.getPublishBody(), + _transientMessageData.getContentHeaderBody()); // we then allow the transactional context to do something with the message content // now that it has all been received, before we attempt delivery _txnContext.messageFullyReceived(isPersistent()); - for (AMQQueue q : _destinationQueues) + + _transientMessageData = null; + + for (AMQQueue q : destinationQueues) { _txnContext.deliver(this, q); } } finally { - _destinationQueues.clear(); - _destinationQueues = null; + destinationQueues.clear(); decrementReference(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java new file mode 100644 index 0000000000..0c50dc5207 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java @@ -0,0 +1,118 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.AMQException; + +import java.util.List; +import java.util.LinkedList; + +/** + * Contains data that is only used in AMQMessage transiently, e.g. while the content + * body fragments are arriving. + * + * Having this data stored in a separate class means that the AMQMessage class avoids + * the small overhead of numerous guaranteed-null references. + * + * @author Apache Software Foundation + */ +public class TransientMessageData +{ + /** + * Stored temporarily until the header has been received at which point it is used when + * constructing the handle + */ + private BasicPublishBody _publishBody; + + /** + * Also stored temporarily. + */ + private ContentHeaderBody _contentHeaderBody; + + /** + * Keeps a track of how many bytes we have received in body frames + */ + private long _bodyLengthReceived = 0; + + /** + * This is stored during routing, to know the queues to which this message should immediately be + * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done + * by the message handle. + */ + private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>(); + + public BasicPublishBody getPublishBody() + { + return _publishBody; + } + + public void setPublishBody(BasicPublishBody publishBody) + { + _publishBody = publishBody; + } + + public List<AMQQueue> getDestinationQueues() + { + return _destinationQueues; + } + + public void setDestinationQueues(List<AMQQueue> destinationQueues) + { + _destinationQueues = destinationQueues; + } + + public ContentHeaderBody getContentHeaderBody() + { + return _contentHeaderBody; + } + + public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) + { + _contentHeaderBody = contentHeaderBody; + } + + public long getBodyLengthReceived() + { + return _bodyLengthReceived; + } + + public void addBodyLength(int value) + { + _bodyLengthReceived += value; + } + + public boolean isAllContentReceived() throws AMQException + { + return _bodyLengthReceived == _contentHeaderBody.bodySize; + } + + public void addDestinationQueue(AMQQueue queue) + { + _destinationQueues.add(queue); + } + + public boolean isPersistent() + { + //todo remove literal values to a constant file such as AMQConstants in common + return _contentHeaderBody.properties instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index bf82940ec8..ba25ee32d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -41,7 +41,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException { - ContentHeaderBody chb = _contentHeaderBody.get(); + ContentHeaderBody chb = (_contentHeaderBody != null?_contentHeaderBody.get():null); if (chb == null) { MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); @@ -87,7 +87,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle public BasicPublishBody getPublishBody(long messageId) throws AMQException { - BasicPublishBody bpb = _publishBody.get(); + BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null); if (bpb == null) { MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java index bb7397f194..06aa144237 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,7 +22,6 @@ package org.apache.qpid.client.failover; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.failover.FailoverException; import javax.jms.JMSException; @@ -56,7 +55,7 @@ public abstract class FailoverSupport } catch (FailoverException e) { - _log.info("Failover exception caught during operation"); + _log.info("Failover exception caught during operation: " + e, e); } } } |