summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-06 12:41:25 +0000
committerRobert Greig <rgreig@apache.org>2006-12-06 12:41:25 +0000
commitbe821e59060035f7041c35353aafbf576e92b55b (patch)
tree04cd382ff4a86c9eb1fe35b6db1ec8e638398679
parenta14806948d841c98842a6b45aaf36250c13ef42b (diff)
downloadqpid-python-be821e59060035f7041c35353aafbf576e92b55b.tar.gz
Improvements to support message recovery on broker startup
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@483058 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java93
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java118
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java7
4 files changed, 169 insertions, 53 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index f2cd0d13cd..12ae8330a6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -54,23 +54,8 @@ public class AMQMessage
private AMQMessageHandle _messageHandle;
- /**
- * Stored temporarily until the header has been received at which point it is used when
- * constructing the handle
- */
- private BasicPublishBody _publishBody;
-
- /**
- * Also stored temporarily.
- */
- private ContentHeaderBody _contentHeaderBody;
-
- /**
- * Keeps a track of how many bytes we have received in body frames
- */
- private long _bodyLengthReceived = 0;
-
- private final TransactionalContext _txnContext;
+ // TODO: ideally this should be able to go into the transient message date - check this! (RG)
+ private TransactionalContext _txnContext;
/**
* Flag to indicate whether message has been delivered to a
@@ -79,12 +64,7 @@ public class AMQMessage
*/
private boolean _deliveredToConsumer;
- /**
- * This is stored during routing, to know the queues to which this message should immediately be
- * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
- * by the message handle.
- */
- private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+ private TransientMessageData _transientMessageData = new TransientMessageData();
/**
* Used to iterate through all the body frames associated with this message. Will not
@@ -160,7 +140,7 @@ public class AMQMessage
{
_messageId = messageId;
_txnContext = txnContext;
- _publishBody = publishBody;
+ _transientMessageData.setPublishBody(publishBody);
if (_log.isDebugEnabled())
{
_log.debug("Message created with id " + messageId);
@@ -168,6 +148,22 @@ public class AMQMessage
}
/**
+ * Used when recovering, i.e. when the message store is creating references to messages.
+ * In that case, the normal enqueue/routingComplete is not done since the recovery process
+ * is responsible for routing the messages to queues.
+ * @param messageId
+ * @param store
+ * @param factory
+ * @throws AMQException
+ */
+ public AMQMessage(long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException
+ {
+ _messageId = messageId;
+ _messageHandle = factory.createMessageHandle(messageId, store, true);
+ _transientMessageData = null;
+ }
+
+ /**
* Used in testing only. This allows the passing of the content header immediately
* on construction.
* @param messageId
@@ -200,7 +196,7 @@ public class AMQMessage
MessageHandleFactory messageHandleFactory) throws AMQException
{
this(messageId, publishBody, txnContext, contentHeader);
- _destinationQueues = destinationQueues;
+ _transientMessageData.setDestinationQueues(destinationQueues);
routingComplete(messageStore, messageHandleFactory);
for (ContentBody cb : contentBodies)
{
@@ -214,6 +210,7 @@ public class AMQMessage
_messageHandle = msg._messageHandle;
_txnContext = msg._txnContext;
_deliveredToConsumer = msg._deliveredToConsumer;
+ _transientMessageData = msg._transientMessageData;
}
public Iterator<AMQDataBlock> getBodyFrameIterator(int channel)
@@ -228,9 +225,9 @@ public class AMQMessage
public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- if (_contentHeaderBody != null)
+ if (_transientMessageData != null)
{
- return _contentHeaderBody;
+ return _transientMessageData.getContentHeaderBody();
}
else
{
@@ -241,7 +238,7 @@ public class AMQMessage
public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
throws AMQException
{
- _contentHeaderBody = contentHeaderBody;
+ _transientMessageData.setContentHeaderBody(contentHeaderBody);
}
public void routingComplete(MessageStore store, MessageHandleFactory factory) throws AMQException
@@ -255,12 +252,12 @@ public class AMQMessage
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
- for (AMQQueue q : _destinationQueues)
+ for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
_messageHandle.enqueue(_messageId, q);
}
- if (_contentHeaderBody.bodySize == 0)
+ if (_transientMessageData.getContentHeaderBody().bodySize == 0)
{
deliver();
}
@@ -268,7 +265,7 @@ public class AMQMessage
public boolean addContentBodyFrame(ContentBody contentBody) throws AMQException
{
- _bodyLengthReceived += contentBody.getSize();
+ _transientMessageData.addBodyLength(contentBody.getSize());
_messageHandle.addContentBodyFrame(_messageId, contentBody);
if (isAllContentReceived())
{
@@ -283,7 +280,7 @@ public class AMQMessage
public boolean isAllContentReceived() throws AMQException
{
- return _bodyLengthReceived == _contentHeaderBody.bodySize;
+ return _transientMessageData.isAllContentReceived();
}
public long getMessageId()
@@ -384,7 +381,7 @@ public class AMQMessage
*/
public void enqueue(AMQQueue queue) throws AMQException
{
- _destinationQueues.add(queue);
+ _transientMessageData.addDestinationQueue(queue);
}
public void dequeue(AMQQueue queue) throws AMQException
@@ -394,11 +391,9 @@ public class AMQMessage
public boolean isPersistent() throws AMQException
{
- if (_contentHeaderBody != null)
+ if (_transientMessageData != null)
{
- //todo remove literal values to a constant file such as AMQConstants in common
- return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ return _transientMessageData.isPersistent();
}
else
{
@@ -425,9 +420,9 @@ public class AMQMessage
public BasicPublishBody getPublishBody() throws AMQException
{
BasicPublishBody pb;
- if (_publishBody != null)
+ if (_transientMessageData != null)
{
- pb = _publishBody;
+ pb = _transientMessageData.getPublishBody();
}
else
{
@@ -452,26 +447,30 @@ public class AMQMessage
private void deliver() throws AMQException
{
- // first we allow the handle to know that the message has been fully received. This is useful if it is
- // maintaining any calculated values based on content chunks
+ // we get a reference to the destination queues now so that we can clear the
+ // transient message data as quickly as possible
+ List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
try
{
- _messageHandle.setPublishAndContentHeaderBody(_messageId, _publishBody, _contentHeaderBody);
- _publishBody = null;
- _contentHeaderBody = null;
+ // first we allow the handle to know that the message has been fully received. This is useful if it is
+ // maintaining any calculated values based on content chunks
+ _messageHandle.setPublishAndContentHeaderBody(_messageId, _transientMessageData.getPublishBody(),
+ _transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
- for (AMQQueue q : _destinationQueues)
+
+ _transientMessageData = null;
+
+ for (AMQQueue q : destinationQueues)
{
_txnContext.deliver(this, q);
}
}
finally
{
- _destinationQueues.clear();
- _destinationQueues = null;
+ destinationQueues.clear();
decrementReference();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
new file mode 100644
index 0000000000..0c50dc5207
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Contains data that is only used in AMQMessage transiently, e.g. while the content
+ * body fragments are arriving.
+ *
+ * Having this data stored in a separate class means that the AMQMessage class avoids
+ * the small overhead of numerous guaranteed-null references.
+ *
+ * @author Apache Software Foundation
+ */
+public class TransientMessageData
+{
+ /**
+ * Stored temporarily until the header has been received at which point it is used when
+ * constructing the handle
+ */
+ private BasicPublishBody _publishBody;
+
+ /**
+ * Also stored temporarily.
+ */
+ private ContentHeaderBody _contentHeaderBody;
+
+ /**
+ * Keeps a track of how many bytes we have received in body frames
+ */
+ private long _bodyLengthReceived = 0;
+
+ /**
+ * This is stored during routing, to know the queues to which this message should immediately be
+ * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
+ * by the message handle.
+ */
+ private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+
+ public BasicPublishBody getPublishBody()
+ {
+ return _publishBody;
+ }
+
+ public void setPublishBody(BasicPublishBody publishBody)
+ {
+ _publishBody = publishBody;
+ }
+
+ public List<AMQQueue> getDestinationQueues()
+ {
+ return _destinationQueues;
+ }
+
+ public void setDestinationQueues(List<AMQQueue> destinationQueues)
+ {
+ _destinationQueues = destinationQueues;
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+ {
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public long getBodyLengthReceived()
+ {
+ return _bodyLengthReceived;
+ }
+
+ public void addBodyLength(int value)
+ {
+ _bodyLengthReceived += value;
+ }
+
+ public boolean isAllContentReceived() throws AMQException
+ {
+ return _bodyLengthReceived == _contentHeaderBody.bodySize;
+ }
+
+ public void addDestinationQueue(AMQQueue queue)
+ {
+ _destinationQueues.add(queue);
+ }
+
+ public boolean isPersistent()
+ {
+ //todo remove literal values to a constant file such as AMQConstants in common
+ return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
index bf82940ec8..ba25ee32d7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -41,7 +41,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException
{
- ContentHeaderBody chb = _contentHeaderBody.get();
+ ContentHeaderBody chb = (_contentHeaderBody != null?_contentHeaderBody.get():null);
if (chb == null)
{
MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
@@ -87,7 +87,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
public BasicPublishBody getPublishBody(long messageId) throws AMQException
{
- BasicPublishBody bpb = _publishBody.get();
+ BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null);
if (bpb == null)
{
MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
index bb7397f194..06aa144237 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,7 +22,6 @@ package org.apache.qpid.client.failover;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.failover.FailoverException;
import javax.jms.JMSException;
@@ -56,7 +55,7 @@ public abstract class FailoverSupport
}
catch (FailoverException e)
{
- _log.info("Failover exception caught during operation");
+ _log.info("Failover exception caught during operation: " + e, e);
}
}
}