summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-02-13 11:24:44 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-02-13 11:24:44 +0000
commitbc5378a4b3220ec8c1e700c5fe705d983b4b0c7b (patch)
tree207cb54e9150e7d442611283c20dffae401b3d76
parent90588ea3686949802323f938934fba05e7b97d1c (diff)
downloadqpid-python-bc5378a4b3220ec8c1e700c5fe705d983b4b0c7b.tar.gz
QPID-1629 : Convered AMQMessage to Interface and created concrete Transient/PersistentAMQMessage implementations
Removed the use of WeakReferences from PersistentAMQMessage and therefore the need to have a StoreContext on get requests. NOTE: this checking will break persistent recovery. Coverted all uses of *MessageHandle to AMQMessage. A number of tests (SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message) still use a custom constructor on Transient/PersistentAMQMessage. This is because they have their own Message implemntations that are used for testing. However, I'm sure they could be modified to override the required functionality rather than attempt to use the existing Factory and Wrap the resulting Message. A new JIRA to address this QPID-1659. QPID-1628 : The update to MessageFactory removes the commented out code git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@744079 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java39
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java36
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java436
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java62
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java151
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java (renamed from qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java)20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java84
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java469
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java223
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java32
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java32
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java72
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java311
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java48
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java20
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java (renamed from qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java)14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java (renamed from qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java)40
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java48
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java467
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java22
35 files changed, 1367 insertions, 1428 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 5fde08cbdd..3b290b3d51 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -35,12 +35,12 @@ import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.UnauthorizedAccessException;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
@@ -108,7 +108,7 @@ public class AMQChannel
private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
- private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
+ private MessageFactory _messageHandleFactory = new MessageFactory();
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 4949e5b41d..ea94f23ff9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -28,7 +28,6 @@ package org.apache.qpid.server.output.amqp0_8;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -37,8 +36,6 @@ import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
-import org.apache.mina.common.ByteBuffer;
-
import java.util.Iterator;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -79,11 +76,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
-
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
@@ -100,7 +93,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -112,7 +105,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -126,8 +119,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
final AMQMessage message = queueEntry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
AMQDataBlock deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
@@ -135,7 +126,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -150,7 +141,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -162,7 +153,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -179,7 +170,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final AMQMessage message = queueEntry.getMessage();
final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicDeliverBody deliverBody =
@@ -188,18 +178,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
queueEntry.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey());
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
-
-
- return deliverFrame;
+ return deliverBody.generateFrame(channelId);
}
private AMQDataBlock createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final AMQMessage message = queueEntry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicGetOkBody getOkBody =
@@ -208,9 +194,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
+ return getOkBody.generateFrame(channelId);
}
public byte getProtocolMinorVersion()
@@ -231,9 +215,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
+ return basicReturnBody.generateFrame(channelId);
+
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index 00a15d2d50..b71b118275 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -28,9 +28,7 @@ import java.util.Iterator;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -77,12 +75,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQBody deliverBody = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
-
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
-
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
@@ -99,7 +92,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
@@ -111,7 +104,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -123,9 +116,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
+ return ContentHeaderBody.createAMQFrame(channelId, contentHeaderBody);
}
@@ -133,15 +124,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
final AMQMessage message = queueEntry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
AMQFrame deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -156,7 +145,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -168,7 +157,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -190,7 +179,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final AMQShortString exchangeName = pb.getExchange();
final AMQShortString routingKey = pb.getRoutingKey();
- final AMQBody returnBlock = new AMQBody()
+ return new AMQBody()
{
public AMQBody _underlyingBody;
@@ -238,7 +227,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
throw new AMQException("This block should never be dispatched!");
}
};
- return returnBlock;
}
private AMQFrame createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
@@ -253,9 +241,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
+ return getOkBody.generateFrame(channelId);
}
public byte getProtocolMinorVersion()
@@ -276,9 +262,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
+ return basicReturnBody.generateFrame(channelId);
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index d73d37f48d..2bd6e612f8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -20,363 +20,52 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-
+import org.apache.qpid.AMQException;
import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
-/**
- * A deliverable message.
- */
-public class AMQMessage
+public interface AMQMessage
{
- /** Used for debugging purposes. */
- private static final Logger _log = Logger.getLogger(AMQMessage.class);
-
- private final AtomicInteger _referenceCount = new AtomicInteger(1);
-
- private final AMQMessageHandle _messageHandle;
-
- /** Holds the transactional context in which this message is being processed. */
- private StoreContext _storeContext;
-
- /** Flag to indicate that this message requires 'immediate' delivery. */
-
- private static final byte IMMEDIATE = 0x01;
-
- /**
- * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
- * for messages published with the 'immediate' flag.
- */
-
- private static final byte DELIVERED_TO_CONSUMER = 0x02;
-
- private byte _flags = 0;
-
- private long _expiration;
-
- private final long _size;
-
- private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
- private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-
-
-
- /**
- * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
- * therefore is memory-efficient.
- */
- private class BodyFrameIterator implements Iterator<AMQDataBlock>
- {
- private int _channel;
-
- private int _index = -1;
- private AMQProtocolSession _protocolSession;
-
- private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
- {
- _channel = channel;
- _protocolSession = protocolSession;
- }
-
- public boolean hasNext()
- {
- try
- {
- return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
- }
- catch (AMQException e)
- {
- _log.error("Unable to get body count: " + e, e);
-
- return false;
- }
- }
-
- public AMQDataBlock next()
- {
- try
- {
-
- AMQBody cb =
- getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
- ++_index));
-
- return new AMQFrame(_channel, cb);
- }
- catch (AMQException e)
- {
- // have no choice but to throw a runtime exception
- throw new RuntimeException("Error getting content body: " + e, e);
- }
-
- }
-
- private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
- {
- return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- public void clearStoreContext()
- {
- _storeContext = new StoreContext();
- }
-
- public StoreContext getStoreContext()
- {
- return _storeContext;
- }
-
- private class BodyContentIterator implements Iterator<ContentChunk>
- {
-
- private int _index = -1;
-
- public boolean hasNext()
- {
- try
- {
- return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
- }
- catch (AMQException e)
- {
- _log.error("Error getting body count: " + e, e);
-
- return false;
- }
- }
-
- public ContentChunk next()
- {
- try
- {
- return _messageHandle.getContentChunk(getStoreContext(), ++_index);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Error getting content body: " + e, e);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
-
-
- /**
- * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
- * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
- * queues.
- *
- * @param messageId
- * @param store
- * @param factory
- *
- * @throws AMQException
- */
- public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
- throws AMQException
- {
- _messageHandle = factory.createMessageHandle(messageId, store, true);
- _storeContext = txnConext.getStoreContext();
- _size = _messageHandle.getBodySize(txnConext.getStoreContext());
- }
-
- /**
- * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
- * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
- * queues.
- *
- * @param messageHandle
- *
- * @throws AMQException
- */
- public AMQMessage(
- AMQMessageHandle messageHandle,
- StoreContext storeConext,
- MessagePublishInfo info)
- throws AMQException
- {
- _messageHandle = messageHandle;
- _storeContext = storeConext;
-
- if(info.isImmediate())
- {
- _flags |= IMMEDIATE;
- }
- _size = messageHandle.getBodySize(storeConext);
-
- }
+ //Get Content relating to this message
+ Long getMessageId();
- protected AMQMessage(AMQMessage msg) throws AMQException
- {
- _messageHandle = msg._messageHandle;
- _storeContext = msg._storeContext;
- _flags = msg._flags;
- _size = msg._size;
+ Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel);
- }
+ Iterator<ContentChunk> getContentBodyIterator();
+ ContentHeaderBody getContentHeaderBody();
- public String debugIdentity()
- {
- return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
- }
+ ContentChunk getContentChunk(int index);
- public void setExpiration(final long expiration)
- {
+ Object getPublisherClientInstance();
- _expiration = expiration;
+ Object getPublisherIdentifier();
- }
+ MessagePublishInfo getMessagePublishInfo();
- public boolean isReferenced()
- {
- return _referenceCount.get() > 0;
- }
+ int getBodyCount();
- public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
- {
- return new BodyFrameIterator(protocolSession, channel);
- }
+ long getSize();
- public Iterator<ContentChunk> getContentBodyIterator()
- {
- return new BodyContentIterator();
- }
+ long getArrivalTime();
- public ContentHeaderBody getContentHeaderBody() throws AMQException
- {
- return _messageHandle.getContentHeaderBody(getStoreContext());
- }
-
- public Long getMessageId()
- {
- return _messageHandle.getMessageId();
- }
-
- /**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
- * operation.
- */
- public AMQMessage takeReference()
- {
- incrementReference(); // _referenceCount.incrementAndGet();
-
- return this;
- }
-
- public boolean incrementReference()
- {
- return incrementReference(1);
- }
-
- /* Threadsafe. Increment the reference count on the message. */
- public boolean incrementReference(int count)
- {
- if(_referenceCount.addAndGet(count) <= 1)
- {
- _referenceCount.addAndGet(-count);
- return false;
- }
- else
- {
- return true;
- }
-
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- *
- * @param storeContext
- *
- * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
- * failed
- */
- public void decrementReference(StoreContext storeContext) throws MessageCleanupException
- {
-
- int count = _referenceCount.decrementAndGet();
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
-
- try
- {
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_messageHandle != null)
- {
- _messageHandle.removeMessage(storeContext);
- }
- }
- catch (AMQException e)
- {
- // to maintain consistency, we revert the count
- incrementReference();
- throw new MessageCleanupException(getMessageId(), e);
- }
- }
- else
- {
- if (count < 0)
- {
- throw new MessageCleanupException("Reference count for message id " + debugIdentity()
- + " has gone below 0.");
- }
- }
- }
-
+ //Check the status of this message
/**
* Called selectors to determin if the message has already been sent
*
* @return _deliveredToConsumer
*/
- public boolean getDeliveredToConsumer()
- {
- return (_flags & DELIVERED_TO_CONSUMER) != 0;
- }
-
- public boolean isPersistent() throws AMQException
- {
- return _messageHandle.isPersistent();
- }
+ boolean getDeliveredToConsumer();
/**
* Called to enforce the 'immediate' flag.
@@ -384,89 +73,62 @@ public class AMQMessage
* @returns true if the message is marked for immediate delivery but has not been marked as delivered
* to a consumer
*/
- public boolean immediateAndNotDelivered()
- {
-
- return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
-
- }
-
- public MessagePublishInfo getMessagePublishInfo() throws AMQException
- {
- return _messageHandle.getMessagePublishInfo(getStoreContext());
- }
-
- public long getArrivalTime()
- {
- return _messageHandle.getArrivalTime();
- }
+ boolean immediateAndNotDelivered();
/**
* Checks to see if the message has expired. If it has the message is dequeued.
*
- * @param queue The queue to check the expiration against. (Currently not used)
- *
* @return true if the message has expire
*
- * @throws AMQException
+ * @throws org.apache.qpid.AMQException
*/
- public boolean expired(AMQQueue queue) throws AMQException
- {
+ boolean expired() throws AMQException;
- if (_expiration != 0L)
- {
- long now = System.currentTimeMillis();
-
- return (now > _expiration);
- }
+ /** Is this a persistent message
+ *
+ * @return true if the message is persistent
+ */
+ boolean isPersistent();
- return false;
- }
/**
* Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
* And for selector efficiency.
*/
- public void setDeliveredToConsumer()
- {
- _flags |= DELIVERED_TO_CONSUMER;
- }
+ void setDeliveredToConsumer();
+
+ void setExpiration(long expiration);
+
+ void setClientIdentifier(AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier);
+
+ /**
+ * This is called when all the content has been received.
+ * @param storeContext
+ *@param messagePublishInfo
+ * @param contentHeaderBody @throws org.apache.qpid.AMQException
+ */
+ void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody)
+ throws AMQException;
+ void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+ throws AMQException;
- public AMQMessageHandle getMessageHandle()
- {
- return _messageHandle;
- }
+ void removeMessage(StoreContext storeContext) throws AMQException;
- public long getSize()
- {
- return _size;
+ String toString();
- }
+ String debugIdentity();
- public Object getPublisherClientInstance()
- {
- return _sessionIdentifier.getSessionInstance();
- }
-
- public Object getPublisherIdentifier()
- {
- return _sessionIdentifier.getSessionIdentifier();
- }
+ // Reference counting methods
- public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
- {
- _sessionIdentifier = sessionIdentifier;
- }
+ void decrementReference(StoreContext storeContext) throws MessageCleanupException;
+ boolean incrementReference(int queueCount);
- public String toString()
- {
- // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- // _taken + " by :" + _takenBySubcription;
+ boolean incrementReference();
- return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
- }
+ AMQMessage takeReference();
+ boolean isReferenced();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index f5853bd303..a08719875d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -353,29 +353,20 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
}
}
- try
+ // Create header attributes list
+ CommonContentHeaderProperties headerProperties =
+ (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+ String mimeType = null, encoding = null;
+ if (headerProperties != null)
{
- // Create header attributes list
- CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
- {
- AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
- encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
- }
+ AMQShortString mimeTypeShortSting = headerProperties.getContentType();
+ mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
+ encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
+ }
- Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+ Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
- return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
- }
- catch (AMQException e)
- {
- JMException jme = new JMException("Error creating header attributes list: " + e);
- jme.initCause(e);
- throw jme;
- }
+ return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
}
/**
@@ -392,27 +383,18 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
List<QueueEntry> list = _queue.getMessagesOnTheQueue();
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
- try
+ // Create the tabular list of message header contents
+ for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
- // Create the tabular list of message header contents
- for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
- {
- QueueEntry queueEntry = list.get(i - 1);
- AMQMessage msg = queueEntry.getMessage();
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize,
- queueEntry.isRedelivered() };
- CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
- _messageList.put(messageData);
- }
- }
- catch (AMQException e)
- {
- JMException jme = new JMException("Error creating message contents: " + e);
- jme.initCause(e);
- throw jme;
+ QueueEntry queueEntry = list.get(i - 1);
+ AMQMessage msg = queueEntry.getMessage();
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ String[] headerAttributes = getMessageHeaderProperties(headerBody);
+ Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize,
+ queueEntry.isRedelivered() };
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
+ _messageList.put(messageData);
}
return _messageList;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
deleted file mode 100644
index 1092f67d94..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.store.StoreContext;
-
-/**
- */
-public class InMemoryMessageHandle implements AMQMessageHandle
-{
-
- private ContentHeaderBody _contentHeaderBody;
-
- private MessagePublishInfo _messagePublishInfo;
-
- private List<ContentChunk> _contentBodies;
-
- private boolean _redelivered;
-
- private long _arrivalTime;
-
- private final Long _messageId;
-
- public InMemoryMessageHandle(final Long messageId)
- {
- _messageId = messageId;
- }
-
- public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException
- {
- return _contentHeaderBody;
- }
-
- public Long getMessageId()
- {
- return _messageId;
- }
-
- public int getBodyCount(StoreContext context)
- {
- return _contentBodies.size();
- }
-
- public long getBodySize(StoreContext context) throws AMQException
- {
- return getContentHeaderBody(context).bodySize;
- }
-
- public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException
- {
- if(_contentBodies == null)
- {
- throw new RuntimeException("No ContentBody has been set");
- }
-
- if (index > _contentBodies.size() - 1 || index < 0)
- {
- throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
- (_contentBodies.size() - 1));
- }
- return _contentBodies.get(index);
- }
-
- public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentBody, boolean isLastContentBody)
- throws AMQException
- {
- if(_contentBodies == null)
- {
- if(isLastContentBody)
- {
- _contentBodies = Collections.singletonList(contentBody);
- }
- else
- {
- _contentBodies = new ArrayList<ContentChunk>();
- _contentBodies.add(contentBody);
- }
- }
- else
- {
- _contentBodies.add(contentBody);
- }
- }
-
- public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException
- {
- return _messagePublishInfo;
- }
-
- public boolean isPersistent()
- {
- return false;
- }
-
- /**
- * This is called when all the content has been received.
- * @param messagePublishInfo
- * @param contentHeaderBody
- * @throws AMQException
- */
- public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
- ContentHeaderBody contentHeaderBody)
- throws AMQException
- {
- _messagePublishInfo = messagePublishInfo;
- _contentHeaderBody = contentHeaderBody;
- if(contentHeaderBody.bodySize == 0)
- {
- _contentBodies = Collections.EMPTY_LIST;
- }
- _arrivalTime = System.currentTimeMillis();
- }
-
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- // NO OP
- }
-
- public long getArrivalTime()
- {
- return _arrivalTime;
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index b994040131..aad99da6c3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -35,7 +35,6 @@ import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
public class IncomingMessage implements Filterable<RuntimeException>
{
@@ -48,7 +47,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
- private AMQMessageHandle _messageHandle;
+ private AMQMessage _message;
private final Long _messageId;
private final TransactionalContext _txnContext;
@@ -74,7 +73,6 @@ public class IncomingMessage implements Filterable<RuntimeException>
private Exchange _exchange;
-
public IncomingMessage(final Long messageId,
final MessagePublishInfo info,
final TransactionalContext txnContext,
@@ -124,11 +122,11 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
public void routingComplete(final MessageStore store,
- final MessageHandleFactory factory) throws AMQException
+ final MessageFactory factory) throws AMQException
{
final boolean persistent = isPersistent();
- _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
+ _message = factory.createMessage(_messageId, store, persistent);
if (persistent)
{
_txnContext.beginTranIfNecessary();
@@ -157,21 +155,16 @@ public class IncomingMessage implements Filterable<RuntimeException>
_logger.debug("Delivering message " + _messageId + " to " + _destinationQueues);
}
- AMQMessage message = null;
-
try
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
- _messagePublishInfo, getContentHeaderBody());
+ _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody());
+
-
-
- message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
- message.setExpiration(_expiration);
- message.setClientIdentifier(_publisher.getSessionIdentifier());
+ _message.setExpiration(_expiration);
+ _message.setClientIdentifier(_publisher.getSessionIdentifier());
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
@@ -182,7 +175,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
{
- throw new UnauthorizedAccessException("Acccess Refused",message);
+ throw new UnauthorizedAccessException("Acccess Refused", _message);
}
if ((_destinationQueues == null) || _destinationQueues.size() == 0)
@@ -190,26 +183,26 @@ public class IncomingMessage implements Filterable<RuntimeException>
if (isMandatory() || isImmediate())
{
- throw new NoRouteException("No Route for message", message);
+ throw new NoRouteException("No Route for message", _message);
}
else
{
- _logger.warn("MESSAGE DISCARDED: No routes for message - " + message);
+ _logger.warn("MESSAGE DISCARDED: No routes for message - " + _message);
}
}
else
{
int offset;
final int queueCount = _destinationQueues.size();
- message.incrementReference(queueCount);
+ _message.incrementReference(queueCount);
if(queueCount == 1)
{
offset = 0;
}
else
{
- offset = ((int)(message.getMessageId().longValue())) % queueCount;
+ offset = ((int)(_message.getMessageId().longValue())) % queueCount;
if(offset < 0)
{
offset = -offset;
@@ -218,22 +211,21 @@ public class IncomingMessage implements Filterable<RuntimeException>
for (int i = offset; i < queueCount; i++)
{
// normal deliver so add this message at the end.
- _txnContext.deliver(_destinationQueues.get(i), message);
+ _txnContext.deliver(_destinationQueues.get(i), _message);
}
for (int i = 0; i < offset; i++)
{
// normal deliver so add this message at the end.
- _txnContext.deliver(_destinationQueues.get(i), message);
+ _txnContext.deliver(_destinationQueues.get(i), _message);
}
}
- message.clearStoreContext();
- return message;
+ return _message;
}
finally
{
// Remove refence for routing process . Reference count should now == delivered queue count
- if(message != null) message.decrementReference(_txnContext.getStoreContext());
+ if(_message != null) _message.decrementReference(_txnContext.getStoreContext());
}
}
@@ -244,7 +236,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
_bodyLengthReceived += contentChunk.getSize();
- _messageHandle.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
+ _message.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
index bdb0707c27..e18834874f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
@@ -20,18 +20,22 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-public class MockAMQMessageHandle extends InMemoryMessageHandle
+public class MessageFactory
{
- public MockAMQMessageHandle(final Long messageId)
- {
- super(messageId);
- }
- @Override
- public long getBodySize(StoreContext store)
+ public AMQMessage createMessage(Long messageId, MessageStore store, boolean persistent)
{
- return 0l;
+ if (persistent)
+ {
+ return new PersistentAMQMessage(messageId, store);
+ }
+ else
+ {
+ return new TransientAMQMessage(messageId);
+ }
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
index 6f9efd3200..e33b0c83c7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -47,20 +47,13 @@ public enum NotificationCheck
if(maximumMessageSize != 0)
{
// Check for threshold message size
- long messageSize;
- try
- {
- messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
- }
- catch (AMQException e)
- {
- messageSize = 0;
- }
-
+ long messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
if (messageSize >= maximumMessageSize)
{
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
+ maximumMessageSize + ") breached. [Message ID=" +
+ (msg == null ? "null" : msg.getMessageId()) + "]");
return true;
}
}
@@ -110,7 +103,7 @@ public enum NotificationCheck
}
}
return false;
-
+
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
new file mode 100644
index 0000000000..04e3635f92
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+public class PersistentAMQMessage extends TransientAMQMessage
+{
+ protected MessageStore _messageStore;
+
+ public PersistentAMQMessage(Long messageId, MessageStore store)
+ {
+ super(messageId);
+ _messageStore = store;
+ }
+
+ @Override
+ public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+ throws AMQException
+ {
+ super.addContentBodyFrame(storeContext, contentChunk, isLastContentBody);
+ _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
+ contentChunk, isLastContentBody);
+ }
+
+ @Override
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
+ ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+ super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody);
+ MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime);
+
+ _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+ }
+
+ @Override
+ public void removeMessage(StoreContext storeContext) throws AMQException
+ {
+ _messageStore.removeMessage(storeContext, _messageId);
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public void recoverFromMessageMetaData(MessageMetaData mmd)
+ {
+ _arrivalTime = mmd.getArrivalTime();
+ _contentHeaderBody = mmd.getContentHeaderBody();
+ _messagePublishInfo = mmd.getMessagePublishInfo();
+ }
+
+ public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
+ {
+ super.addContentBodyFrame(null, contentChunk, isLastContentBody);
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index fd46a8a5ff..7be2827e0f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -54,25 +54,16 @@ public class PriorityQueueList implements QueueEntryList
public QueueEntry add(AMQMessage message)
{
- try
+ int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+ if(index >= _priorities)
{
- int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
- if(index >= _priorities)
- {
- index = _priorities-1;
- }
- else if(index < 0)
- {
- index = 0;
- }
- return _priorityLists[index].add(message);
+ index = _priorities-1;
}
- catch (AMQException e)
+ else if(index < 0)
{
- // TODO - fix AMQ Exception
- throw new RuntimeException(e);
+ index = 0;
}
-
+ return _priorityLists[index].add(message);
}
public QueueEntry next(QueueEntry node)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index fe9686e906..ba14be5580 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -132,7 +132,7 @@ public class QueueEntryImpl implements QueueEntry
public boolean expired() throws AMQException
{
- return getMessage().expired(getQueue());
+ return getMessage().expired();
}
public boolean isAcquired()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
new file mode 100644
index 0000000000..8c62e046f8
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
@@ -0,0 +1,469 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A deliverable message.
+ */
+public class TransientAMQMessage implements AMQMessage
+{
+ /** Used for debugging purposes. */
+ private static final Logger _log = Logger.getLogger(AMQMessage.class);
+
+ private final AtomicInteger _referenceCount = new AtomicInteger(1);
+
+ protected ContentHeaderBody _contentHeaderBody;
+
+ protected MessagePublishInfo _messagePublishInfo;
+
+ protected List<ContentChunk> _contentBodies;
+
+ protected long _arrivalTime;
+
+ protected final Long _messageId;
+
+
+
+ /** Flag to indicate that this message requires 'immediate' delivery. */
+
+ private static final byte IMMEDIATE = 0x01;
+
+ /**
+ * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
+ * for messages published with the 'immediate' flag.
+ */
+
+ private static final byte DELIVERED_TO_CONSUMER = 0x02;
+
+ private byte _flags = 0;
+
+ private long _expiration;
+
+ private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
+ private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
+
+ /**
+ * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
+ * therefore is memory-efficient.
+ */
+ private class BodyFrameIterator implements Iterator<AMQDataBlock>
+ {
+ private int _channel;
+
+ private int _index = -1;
+ private AMQProtocolSession _protocolSession;
+
+ private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
+ {
+ _channel = channel;
+ _protocolSession = protocolSession;
+ }
+
+ public boolean hasNext()
+ {
+ return _index < (getBodyCount() - 1);
+ }
+
+ public AMQDataBlock next()
+ {
+ AMQBody cb =
+ getProtocolVersionMethodConverter().convertToBody(getContentChunk(++_index));
+
+ return new AMQFrame(_channel, cb);
+ }
+
+ private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+ {
+ return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class BodyContentIterator implements Iterator<ContentChunk>
+ {
+
+ private int _index = -1;
+
+ public boolean hasNext()
+ {
+ return _index < (getBodyCount() - 1);
+ }
+
+ public ContentChunk next()
+ {
+ return getContentChunk(++_index);
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message
+ * These all need refactoring to some sort of MockAMQMessageFactory.
+ */
+ @Deprecated
+ protected TransientAMQMessage(AMQMessage message) throws AMQException
+ {
+ _messageId = message.getMessageId();
+ _flags = ((TransientAMQMessage)message)._flags;
+ _contentHeaderBody = message.getContentHeaderBody();
+ _messagePublishInfo = message.getMessagePublishInfo();
+ }
+
+
+ /**
+ * Normal message creation via the MessageFactory uses this constructor
+ * Package scope limited as MessageFactory should be used
+ * @see MessageFactory
+ *
+ * @param messageId
+ */
+ TransientAMQMessage(Long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public String debugIdentity()
+ {
+ return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
+ }
+
+ public void setExpiration(final long expiration)
+ {
+ _expiration = expiration;
+ }
+
+ public boolean isReferenced()
+ {
+ return _referenceCount.get() > 0;
+ }
+
+ public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
+ {
+ return new BodyFrameIterator(protocolSession, channel);
+ }
+
+ public Iterator<ContentChunk> getContentBodyIterator()
+ {
+ return new BodyContentIterator();
+ }
+
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public Long getMessageId()
+ {
+ return _messageId;
+ }
+
+ /**
+ * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
+ * operation.
+ */
+ public AMQMessage takeReference()
+ {
+ incrementReference(); // _referenceCount.incrementAndGet();
+
+ return this;
+ }
+
+ public boolean incrementReference()
+ {
+ return incrementReference(1);
+ }
+
+ /* Threadsafe. Increment the reference count on the message. */
+ public boolean incrementReference(int count)
+ {
+ if(_referenceCount.addAndGet(count) <= 1)
+ {
+ _referenceCount.addAndGet(-count);
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+
+ }
+
+ /**
+ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+ * message store.
+ *
+ * @param storeContext
+ *
+ * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
+ */
+ public void decrementReference(StoreContext storeContext) throws MessageCleanupException
+ {
+
+ int count = _referenceCount.decrementAndGet();
+
+ // note that the operation of decrementing the reference count and then removing the message does not
+ // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+ // the message has been passed to all queues. i.e. we are
+ // not relying on the all the increments having taken place before the delivery manager decrements.
+ if (count == 0)
+ {
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _referenceCount.set(Integer.MIN_VALUE/2);
+
+ try
+ {
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
+ removeMessage(storeContext);
+ }
+ catch (AMQException e)
+ {
+ // to maintain consistency, we revert the count
+ incrementReference();
+ throw new MessageCleanupException(getMessageId(), e);
+ }
+ }
+ else
+ {
+ if (count < 0)
+ {
+ throw new MessageCleanupException("Reference count for message id " + debugIdentity()
+ + " has gone below 0.");
+ }
+ }
+ }
+
+
+ /**
+ * Called selectors to determin if the message has already been sent
+ *
+ * @return _deliveredToConsumer
+ */
+ public boolean getDeliveredToConsumer()
+ {
+ return (_flags & DELIVERED_TO_CONSUMER) != 0;
+ }
+
+ /**
+ * Called to enforce the 'immediate' flag.
+ *
+ * @returns true if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
+ */
+ public boolean immediateAndNotDelivered()
+ {
+
+ return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
+
+ }
+
+ /**
+ * Checks to see if the message has expired. If it has the message is dequeued.
+ *
+ * @return true if the message has expire
+ *
+ * @throws AMQException
+ */
+ public boolean expired() throws AMQException
+ {
+
+ if (_expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ return (now > _expiration);
+ }
+
+ return false;
+ }
+
+ /**
+ * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+ * And for selector efficiency.
+ */
+ public void setDeliveredToConsumer()
+ {
+ _flags |= DELIVERED_TO_CONSUMER;
+ }
+
+
+ public long getSize()
+ {
+ return _contentHeaderBody.bodySize;
+ }
+
+ public Object getPublisherClientInstance()
+ {
+ return _sessionIdentifier.getSessionInstance();
+ }
+
+ public Object getPublisherIdentifier()
+ {
+ return _sessionIdentifier.getSessionIdentifier();
+ }
+
+ public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
+ {
+ _sessionIdentifier = sessionIdentifier;
+ }
+
+ /** From AMQMessageHandle **/
+
+ public int getBodyCount()
+ {
+ return _contentBodies.size();
+ }
+
+ public ContentChunk getContentChunk(int index)
+ {
+ if(_contentBodies == null)
+ {
+ throw new RuntimeException("No ContentBody has been set");
+ }
+
+ if (index > _contentBodies.size() - 1 || index < 0)
+ {
+ throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
+ (_contentBodies.size() - 1));
+ }
+ return _contentBodies.get(index);
+ }
+
+ public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+ throws AMQException
+ {
+ if(_contentBodies == null)
+ {
+ if(isLastContentBody)
+ {
+ _contentBodies = Collections.singletonList(contentChunk);
+ }
+ else
+ {
+ _contentBodies = new ArrayList<ContentChunk>();
+ _contentBodies.add(contentChunk);
+ }
+ }
+ else
+ {
+ _contentBodies.add(contentChunk);
+ }
+ }
+
+ public MessagePublishInfo getMessagePublishInfo()
+ {
+ return _messagePublishInfo;
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ /**
+ * This is called when all the content has been received.
+ * @param storeContext
+ *@param messagePublishInfo
+ * @param contentHeaderBody @throws AMQException
+ */
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
+ ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+
+ if (contentHeaderBody == null)
+ {
+ throw new NullPointerException("HeaderBody cannot be null");
+ }
+
+ if( messagePublishInfo == null)
+ {
+ throw new NullPointerException("PublishInfo cannot be null");
+ }
+
+ _messagePublishInfo = messagePublishInfo;
+ _contentHeaderBody = contentHeaderBody;
+
+
+ if( contentHeaderBody.bodySize == 0)
+ {
+ _contentBodies = Collections.EMPTY_LIST;
+ }
+
+ _arrivalTime = System.currentTimeMillis();
+
+ if(messagePublishInfo.isImmediate())
+ {
+ _flags |= IMMEDIATE;
+ }
+ }
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
+ public void removeMessage(StoreContext storeContext) throws AMQException
+ {
+ //no-op
+ }
+
+
+ public String toString()
+ {
+ // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+ // _taken + " by :" + _takenBySubcription;
+
+ return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
deleted file mode 100644
index 804d2c2131..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
-
-/**
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
- */
-public class WeakReferenceMessageHandle implements AMQMessageHandle
-{
- private WeakReference<ContentHeaderBody> _contentHeaderBody;
-
- private WeakReference<MessagePublishInfo> _messagePublishInfo;
-
- private List<WeakReference<ContentChunk>> _contentBodies;
-
- private boolean _redelivered;
-
- private final MessageStore _messageStore;
-
- private final Long _messageId;
- private long _arrivalTime;
-
- public WeakReferenceMessageHandle(final Long messageId, MessageStore messageStore)
- {
- _messageId = messageId;
- _messageStore = messageStore;
- }
-
- public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException
- {
- ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null);
- if (chb == null)
- {
- MessageMetaData mmd = loadMessageMetaData(context);
- chb = mmd.getContentHeaderBody();
- }
- return chb;
- }
-
- public Long getMessageId()
- {
- return _messageId;
- }
-
- private MessageMetaData loadMessageMetaData(StoreContext context)
- throws AMQException
- {
- MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId);
- populateFromMessageMetaData(mmd);
- return mmd;
- }
-
- private void populateFromMessageMetaData(MessageMetaData mmd)
- {
- _arrivalTime = mmd.getArrivalTime();
- _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
- _messagePublishInfo = new WeakReference<MessagePublishInfo>(mmd.getMessagePublishInfo());
- }
-
- public int getBodyCount(StoreContext context) throws AMQException
- {
- if (_contentBodies == null)
- {
- MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId);
- int chunkCount = mmd.getContentChunkCount();
- _contentBodies = new ArrayList<WeakReference<ContentChunk>>(chunkCount);
- for (int i = 0; i < chunkCount; i++)
- {
- _contentBodies.add(new WeakReference<ContentChunk>(null));
- }
- }
- return _contentBodies.size();
- }
-
- public long getBodySize(StoreContext context) throws AMQException
- {
- return getContentHeaderBody(context).bodySize;
- }
-
- public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException
- {
- if(_contentBodies == null)
- {
- throw new RuntimeException("No ContentBody has been set");
- }
-
- if (index > _contentBodies.size() - 1 || index < 0)
- {
- throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
- (_contentBodies.size() - 1));
- }
- WeakReference<ContentChunk> wr = _contentBodies.get(index);
- ContentChunk cb = wr.get();
- if (cb == null)
- {
- cb = _messageStore.getContentBodyChunk(context, _messageId, index);
- _contentBodies.set(index, new WeakReference<ContentChunk>(cb));
- }
- return cb;
- }
-
- /**
- * Content bodies are set <i>before</i> the publish and header frames
- *
- * @param storeContext
- * @param contentChunk
- * @param isLastContentBody
- * @throws AMQException
- */
- public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
- {
- if (_contentBodies == null && isLastContentBody)
- {
- _contentBodies = new ArrayList<WeakReference<ContentChunk>>(1);
- }
- else
- {
- if (_contentBodies == null)
- {
- _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
- }
- }
- _contentBodies.add(new WeakReference<ContentChunk>(contentChunk));
- _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
- contentChunk, isLastContentBody);
- }
-
- public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException
- {
- MessagePublishInfo bpb = (_messagePublishInfo != null ? _messagePublishInfo.get() : null);
- if (bpb == null)
- {
- MessageMetaData mmd = loadMessageMetaData(context);
-
- bpb = mmd.getMessagePublishInfo();
- }
- return bpb;
- }
-
- public boolean isRedelivered()
- {
- return _redelivered;
- }
-
- public void setRedelivered(boolean redelivered)
- {
- _redelivered = redelivered;
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- /**
- * This is called when all the content has been received.
- *
- * @param publishBody
- * @param contentHeaderBody
- * @throws AMQException
- */
- public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo publishBody,
- ContentHeaderBody contentHeaderBody)
- throws AMQException
- {
- // if there are no content bodies the list will be null so we must
- // create en empty list here
- if (contentHeaderBody.bodySize == 0)
- {
- _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
- }
-
- final long arrivalTime = System.currentTimeMillis();
-
- MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), arrivalTime);
-
- _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
-
-
- populateFromMessageMetaData(mmd);
- }
-
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- _messageStore.removeMessage(storeContext, _messageId);
- }
-
- public long getArrivalTime()
- {
- return _arrivalTime;
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index f23983641b..9de2d09b8e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -27,8 +27,8 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -93,7 +93,7 @@ public class DerbyMessageStore implements MessageStore
private String _connectionURL;
-
+ MessageFactory _messageFactory;
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
@@ -167,6 +167,8 @@ public class DerbyMessageStore implements MessageStore
// this recovers durable queues and persistent messages
+ _messageFactory = new MessageFactory();
+
recover();
stateTransition(State.RECOVERING, State.STARTED);
@@ -1299,7 +1301,7 @@ public class DerbyMessageStore implements MessageStore
private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
throws SQLException, AMQException
{
- Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
+ Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
List<ProcessAction> actions = new ArrayList<ProcessAction>();
Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
@@ -1318,8 +1320,6 @@ public class DerbyMessageStore implements MessageStore
conn = newConnection();
}
-
- MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
long maxId = 1;
TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
@@ -1355,7 +1355,11 @@ public class DerbyMessageStore implements MessageStore
}
else
{
- message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
+ message = _messageFactory.createMessage(messageId, this, true);
+
+ _logger.error("todo must do message recovery now.");
+ //todo must do message recovery now.
+
msgMap.put(messageId,message);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
index b5a91c8da6..d46ba85069 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
@@ -349,32 +348,15 @@ public class Show extends AbstractCommand
arrival.add("" + msg.getArrivalTime());
- try
- {
- ispersitent.add(msg.isPersistent() ? "true" : "false");
- }
- catch (AMQException e)
- {
- ispersitent.add("n/a");
- }
+ ispersitent.add(msg.isPersistent() ? "true" : "false");
isredelivered.add(entry.isRedelivered() ? "true" : "false");
isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
-// msg.getMessageHandle();
-
BasicContentHeaderProperties headers = null;
- try
- {
- headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
- }
- catch (AMQException e)
- {
- //ignore
-// commandError("Unable to read properties for message: " + e.getMessage(), null);
- }
+ headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
if (headers != null)
{
@@ -414,15 +396,7 @@ public class Show extends AbstractCommand
AMQShortString useridSS = headers.getUserId();
userid.add(useridSS == null ? "null" : useridSS.toString());
- MessagePublishInfo info = null;
- try
- {
- info = msg.getMessagePublishInfo();
- }
- catch (AMQException e)
- {
- //ignore
- }
+ MessagePublishInfo info = msg.getMessagePublishInfo();
if (info != null)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
index 5fbf9484f7..2a97db6066 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
@@ -22,14 +22,13 @@ package org.apache.qpid.server;
import junit.framework.TestCase;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.queue.MockQueueEntry;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleQueueEntryList;
import org.apache.qpid.server.queue.MockAMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntryIterator;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.MockSubscription;
@@ -38,7 +37,6 @@ import org.apache.qpid.AMQException;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.LinkedList;
-import java.util.Iterator;
/**
* QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
@@ -62,7 +60,7 @@ public class ExtractResendAndRequeueTest extends TestCase
UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
- private AMQQueue _queue = new MockAMQQueue();
+ private AMQQueue _queue = new MockAMQQueue("ExtractResendAndRequeueTest");
private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
@Override
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index a705c8bbb4..228c99dcbd 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -28,11 +28,11 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.TransientAMQMessage;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -113,6 +113,8 @@ public class TxAckTest extends TestCase
private StoreContext _storeContext = new StoreContext();
private AMQQueue _queue;
+ private static final int MESSAGE_SIZE=100;
+
Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
{
TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(),
@@ -128,7 +130,12 @@ public class TxAckTest extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+ AMQMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.bodySize = MESSAGE_SIZE;
+ message.setPublishAndContentHeaderBody(_storeContext, info, header);
+
_map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
}
_acked = acked;
@@ -190,16 +197,15 @@ public class TxAckTest extends TestCase
}
}
- private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
+ private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody)
{
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- null,
- false);
+ final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId,
+ null,
+ false);
try
{
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
- publishBody,
- new ContentHeaderBody()
+ // Safe to use null here as we just created a TransientMessage above
+ amqMessage.setPublishAndContentHeaderBody(null, publishBody, new ContentHeaderBody()
{
public int getSize()
{
@@ -213,11 +219,11 @@ public class TxAckTest extends TestCase
}
- return amqMessageHandle;
+ return amqMessage;
}
- private class TestMessage extends AMQMessage
+ private class TestMessage extends TransientAMQMessage
{
private final long _tag;
private int _count;
@@ -225,7 +231,7 @@ public class TxAckTest extends TestCase
TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
throws AMQException
{
- super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
+ super(createMessage(messageId, publishBody));
_tag = tag;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 883a712bef..e0a4357990 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
@@ -54,7 +55,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private StoreContext _storeContext = new StoreContext();
- private MessageHandleFactory _handleFactory = new MessageHandleFactory();
+ private MessageFactory _handleFactory = new MessageFactory();
private int count;
@@ -370,7 +371,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
/**
* Just add some extra utility methods to AMQMessage to aid testing.
*/
- static class Message extends AMQMessage
+ static class Message extends PersistentAMQMessage
{
private class TestIncomingMessage extends IncomingMessage
{
@@ -392,14 +393,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
public ContentHeaderBody getContentHeaderBody()
{
- try
- {
- return Message.this.getContentHeaderBody();
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
+ return Message.this.getContentHeaderBody();
}
}
@@ -407,10 +401,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private static MessageStore _messageStore = new SkeletonMessageStore();
- private static StoreContext _storeContext = new StoreContext();
-
-
- private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
+ private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(),
null,
new LinkedList<RequiredDeliveryException>()
);
@@ -422,7 +413,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
Message(String id, FieldTable headers) throws AMQException
{
- this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+ this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers));
}
public IncomingMessage getIncomingMessage()
@@ -432,42 +423,35 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private Message(long messageId,
MessagePublishInfo publish,
- ContentHeaderBody header,
- List<ContentBody> bodies) throws AMQException
- {
- super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
-
-
-
- _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
- _incoming.setContentHeaderBody(header);
-
-
- }
-
- private static AMQMessageHandle createMessageHandle(final long messageId,
- final MessagePublishInfo publish,
- final ContentHeaderBody header)
+ ContentHeaderBody header) throws AMQException
{
-
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- _messageStore,
- true);
+ super(messageId, _messageStore);
try
{
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
+ setPublishAndContentHeaderBody(_txnContext.getStoreContext(), publish,header);
}
catch (AMQException e)
{
-
+
}
- return amqMessageHandle;
+
+ _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
+ _incoming.setContentHeaderBody(header);
}
private Message(AMQMessage msg) throws AMQException
{
- super(msg);
+ super(msg.getMessageId(), _messageStore);
+
+ this.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), msg.getMessagePublishInfo(), msg.getContentHeaderBody());
+
+ Iterator<ContentChunk> iterator = msg.getContentBodyIterator();
+
+ while(iterator.hasNext())
+ {
+ this.addContentBodyFrame(_txnContext.getStoreContext(), iterator.next(),iterator.hasNext());
+ }
}
@@ -500,15 +484,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private Object getKey()
{
- try
- {
- return getMessagePublishInfo().getRoutingKey();
- }
- catch (AMQException e)
- {
- _log.error("Error getting routing key: " + e, e);
- return null;
- }
+ return getMessagePublishInfo().getRoutingKey();
}
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index d1a69c9d3c..ddf177690c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -497,7 +497,7 @@ public class DestWildExchangeTest extends TestCase
throws AMQException
{
_exchange.route(message);
- message.routingComplete(_store, new MessageHandleFactory());
+ message.routingComplete(_store, new MessageFactory());
message.deliverToQueues();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index aff7af6952..ffe858f517 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -25,10 +25,12 @@ import java.util.ArrayList;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
import junit.framework.AssertionFailedError;
public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
+ private static final long MESSAGE_SIZE = 100L;
@Override
protected void setUp() throws Exception
@@ -92,11 +94,18 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
protected AMQMessage createMessage(Long id, byte i) throws AMQException
{
- AMQMessage msg = super.createMessage(id);
+ AMQMessage message = super.createMessage(id);
+
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.bodySize = MESSAGE_SIZE;
+
+ //The createMessage above is for a Transient Message so it is safe to have no context.
+ message.setPublishAndContentHeaderBody(null, info, header);
+
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
props.setPriority(i);
- msg.getContentHeaderBody().properties = props;
- return msg;
+ message.getContentHeaderBody().properties = props;
+ return message;
}
protected AMQMessage createMessage(Long id) throws AMQException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index fba30528ea..b159e2cda5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -136,6 +136,7 @@ public class AMQQueueAlertTest extends TestCase
while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
{
sendMessages(1, MAX_MESSAGE_SIZE);
+ System.err.println(_queue.getQueueDepth() + ":" + MAX_QUEUE_DEPTH);
}
Notification lastNotification = _queueMBean.getLastNotification();
@@ -307,7 +308,7 @@ public class AMQQueueAlertTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
messages[i].enqueue(qs);
- messages[i].routingComplete(_messageStore, new MessageHandleFactory());
+ messages[i].routingComplete(_messageStore, new MessageFactory());
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 38f030f670..a5e2da7b36 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -221,7 +221,7 @@ public class AMQQueueMBeanTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, new MessageHandleFactory());
+ msg.routingComplete(_messageStore, new MessageFactory());
msg.addContentBodyFrame(new ContentChunk()
{
@@ -305,7 +305,7 @@ public class AMQQueueMBeanTest extends TestCase
currentMessage.enqueue(qs);
// route header
- currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
+ currentMessage.routingComplete(_messageStore, new MessageFactory());
// Add the body so we have somthing to test later
currentMessage.addContentBodyFrame(
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 01674c5b3d..cd1ee65c0c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -98,7 +98,7 @@ public class AckTest extends TestCase
new LinkedList<RequiredDeliveryException>()
);
_queue.registerSubscription(_subscription,false);
- MessageHandleFactory factory = new MessageHandleFactory();
+ MessageFactory factory = new MessageFactory();
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java
deleted file mode 100644
index cac84c01b4..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentHeaderProperties;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-
-public class InMemoryMessageHandleTest extends TestCase
-{
- AMQMessageHandle _handle;
-
- protected AMQMessageHandle newHandle(Long id)
- {
- return new InMemoryMessageHandle(id);
- }
-
- public void testMessageID()
- {
- Long id = 1L;
- _handle = newHandle(id);
-
- assertEquals("Message not set value", id, _handle.getMessageId());
- }
-
- public void testInvalidContentChunk()
- {
- _handle = newHandle(1L);
-
- try
- {
- _handle.getContentChunk(null, 0);
- fail("getContentChunk should not succeed");
- }
- catch (RuntimeException e)
- {
- assertTrue(e.getMessage().equals("No ContentBody has been set"));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- ContentChunk cc = new MockContentChunk(null, 100);
-
- try
- {
- _handle.addContentBodyFrame(null, cc, false);
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- _handle.getContentChunk(null, -1);
- fail("getContentChunk should not succeed");
- }
- catch (IllegalArgumentException e)
- {
- assertTrue(e.getMessage().contains("out of valid range"));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- _handle.getContentChunk(null, 1);
- fail("getContentChunk should not succeed");
- }
- catch (IllegalArgumentException e)
- {
- assertTrue(e.getMessage().contains("out of valid range"));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
- }
-
- public void testAddSingleContentChunk()
- {
-
- _handle = newHandle(1L);
-
- ContentChunk cc = new MockContentChunk(null, 100);
-
- try
- {
- _handle.addContentBodyFrame(null, cc, true);
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect body count", 1, _handle.getBodyCount(null));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 0));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- cc = new MockContentChunk(null, 100);
-
- try
- {
- _handle.addContentBodyFrame(null, cc, true);
- fail("Exception should prevent adding two final chunks");
- }
- catch (UnsupportedOperationException e)
- {
- //normal path
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- }
-
- public void testAddMultipleContentChunk()
- {
-
- _handle = newHandle(1L);
-
- ContentChunk cc = new MockContentChunk(null, 100);
-
- try
- {
- _handle.addContentBodyFrame(null, cc, false);
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect body count", 1, _handle.getBodyCount(null));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 0));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- cc = new MockContentChunk(null, 100);
-
- try
- {
- _handle.addContentBodyFrame(null, cc, true);
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect body count", 2, _handle.getBodyCount(null));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 1));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- }
-
- // todo Move test to QueueEntry
-// public void testRedelivered()
-// {
-// _handle = newHandle(1L);
-//
-// assertFalse("New message should not be redelivered", _handle.isRedelivered());
-//
-// _handle.setRedelivered(true);
-//
-// assertTrue("New message should not be redelivered", _handle.isRedelivered());
-// }
-
- public void testInitialArrivalTime()
- {
- _handle = newHandle(1L);
-
- assertEquals("Initial Arrival time should be 0L", 0L, _handle.getArrivalTime());
- }
-
- public void testSetPublishAndContentHeaderBody_WithBody()
- {
- _handle = newHandle(1L);
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl();
- int bodySize = 100;
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, new BasicContentHeaderProperties(), bodySize);
-
- try
- {
- _handle.setPublishAndContentHeaderBody(null, mpi, chb);
-
- assertEquals("BodySize not returned correctly. ", bodySize, _handle.getBodySize(null));
- }
- catch (AMQException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
-
- public void testSetPublishAndContentHeaderBody_Empty()
- {
- _handle = newHandle(1L);
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl();
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- try
- {
- _handle.setPublishAndContentHeaderBody(null, mpi, chb);
-
- assertEquals("BodySize not returned correctly. ", bodySize, _handle.getBodySize(null));
-
- ContentHeaderBody retreived_chb = _handle.getContentHeaderBody(null);
-
- ContentHeaderProperties chp = retreived_chb.properties;
-
- assertEquals("ContentHeaderBody not correct", chb, retreived_chb);
-
- assertEquals("AppID not correctly retreived", "HandleTest",
- ((BasicContentHeaderProperties) chp).getAppIdAsString());
-
- MessagePublishInfo retreived_mpi = _handle.getMessagePublishInfo(null);
-
- assertEquals("MessagePublishInfo not correct", mpi, retreived_mpi);
-
-
- }
- catch (AMQException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
-
- public void testIsPersistent()
- {
- _handle = newHandle(1L);
-
- assertFalse(_handle.isPersistent());
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java
new file mode 100644
index 0000000000..582e2bfb00
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+
+public class MessageFactoryTest extends TestCase
+{
+ private MessageFactory _factory;
+
+ public void setUp()
+ {
+ _factory = new MessageFactory();
+ }
+
+ public void testTransientMessageCreation()
+ {
+ AMQMessage message = _factory.createMessage(0L, null, false);
+
+ assertEquals("Transient Message creation does not return correct class.", TransientAMQMessage.class, message.getClass());
+ }
+
+ public void testPersistentMessageCreation()
+ {
+ AMQMessage message = _factory.createMessage(0L, null, true);
+
+ assertEquals("Transient Message creation does not return correct class.", PersistentAMQMessage.class, message.getClass());
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
index a05eb0892b..cc6c486e11 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
@@ -22,23 +22,13 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-public class MockAMQMessage extends AMQMessage
+public class MockAMQMessage extends TransientAMQMessage
{
public MockAMQMessage(long messageId)
throws AMQException
{
- super(new MockAMQMessageHandle(messageId) ,
- (StoreContext)null,
- (MessagePublishInfo)new MessagePublishInfoImpl());
- }
-
- protected MockAMQMessage(AMQMessage msg)
- throws AMQException
- {
- super(msg);
+ super(messageId);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 758c8ddb2e..5f1cc81772 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -27,19 +27,16 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.AMQException;
import org.apache.commons.configuration.Configuration;
import java.util.List;
import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.LinkedList;
public class MockAMQQueue implements AMQQueue
{
private boolean _deleted = false;
+ private int _queueCount;
private AMQShortString _name;
public MockAMQQueue(String name)
@@ -47,11 +44,6 @@ public class MockAMQQueue implements AMQQueue
_name = new AMQShortString(name);
}
- public MockAMQQueue()
- {
-
- }
-
public AMQShortString getName()
{
return _name;
@@ -134,7 +126,7 @@ public class MockAMQQueue implements AMQQueue
public long getQueueDepth()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return _queueCount;
}
public long getReceivedMessageCount()
@@ -159,6 +151,7 @@ public class MockAMQQueue implements AMQQueue
public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
{
+ _queueCount++;
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@@ -169,7 +162,7 @@ public class MockAMQQueue implements AMQQueue
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _queueCount--;
}
public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java
index ee85fecfa3..8a9d1ae771 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java
@@ -20,14 +20,32 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.FixedSizeByteBufferAllocator;
+import org.apache.qpid.framing.abstraction.ContentChunk;
public class MockContentChunk implements ContentChunk
{
+ public static final int DEFAULT_SIZE=0;
+
private ByteBuffer _bytebuffer;
private int _size;
+
+
+ public MockContentChunk()
+ {
+ this(0);
+ }
+
+ public MockContentChunk(int size)
+ {
+ FixedSizeByteBufferAllocator allocator = new FixedSizeByteBufferAllocator();
+ _bytebuffer = allocator.allocate(size, false);
+
+ _size = size;
+ }
+
public MockContentChunk(ByteBuffer bytebuffer, int size)
{
_bytebuffer = bytebuffer;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
index c6e7e2ebe2..e213be7560 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
@@ -21,8 +21,9 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
-public class WeakMessageHandleTest extends InMemoryMessageHandleTest
+public class PersistentMessageTest extends TransientMessageTest
{
private MemoryMessageStore _messageStore;
@@ -30,19 +31,20 @@ public class WeakMessageHandleTest extends InMemoryMessageHandleTest
{
_messageStore = new MemoryMessageStore();
_messageStore.configure();
+ _storeContext = new StoreContext();
}
- protected AMQMessageHandle newHandle(Long id)
+ @Override
+ protected AMQMessage newMessage(Long id)
{
- return new WeakReferenceMessageHandle(id, _messageStore);
+ return new MessageFactory().createMessage(id, _messageStore, true);
}
@Override
public void testIsPersistent()
{
- _handle = newHandle(1L);
- assertTrue(_handle.isPersistent());
+ _message = newMessage(1L);
+ assertTrue(_message.isPersistent());
}
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
index 7573a629c1..f7cd860c22 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
@@ -20,30 +20,30 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.MessageStore;
+import junit.framework.TestCase;
-/**
- * Constructs a message handle based on the publish body, the content header and the queue to which the message
- * has been routed.
- *
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
- */
-public class MessageHandleFactory
+public class QueueEntryImplTest extends TestCase
{
- public AMQMessageHandle createMessageHandle(Long messageId, MessageStore store, boolean persistent)
+ /**
+ * Test the Redelivered state of a QueueEntryImpl
+ */
+ public void testRedelivered()
{
- // just hardcoded for now
- if (persistent)
- {
- return new WeakReferenceMessageHandle(messageId, store);
- }
- else
- {
- return new InMemoryMessageHandle(messageId);
- }
-
-// return new AMQMessage(messageId, store, persistent);
+ QueueEntry entry = new QueueEntryImpl(null, null);
+
+ assertFalse("New message should not be redelivered", entry.isRedelivered());
+
+ entry.setRedelivered(true);
+
+ assertTrue("New message should not be redelivered", entry.isRedelivered());
+
+ //Check we can revert it.. not that we ever should.
+ entry.setRedelivered(false);
+
+ assertFalse("New message should not be redelivered", entry.isRedelivered());
+
}
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 500655c07c..2dcb081739 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -56,7 +56,8 @@ public class SimpleAMQQueueTest extends TestCase
protected FieldTable _arguments = null;
MessagePublishInfo info = new MessagePublishInfoImpl();
-
+ private static final long MESSAGE_SIZE = 100;
+
@Override
protected void setUp() throws Exception
{
@@ -317,7 +318,7 @@ public class SimpleAMQQueueTest extends TestCase
// Send persistent message
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_store, new MessageHandleFactory());
+ msg.routingComplete(_store, new MessageFactory());
_store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
@@ -326,9 +327,14 @@ public class SimpleAMQQueueTest extends TestCase
// Dequeue message
MockQueueEntry entry = new MockQueueEntry();
- AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext);
+ AMQMessage message = new MessageFactory().createMessage(1L, _store, true);
- entry.setMessage(amqmsg);
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.bodySize = MESSAGE_SIZE;
+ // This is a persist message but we are not in a transaction so create a new context for the message
+ message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
+
+ entry.setMessage(message);
_queue.dequeue(null, entry);
// Check that it is dequeued
@@ -338,22 +344,19 @@ public class SimpleAMQQueueTest extends TestCase
// FIXME: move this to somewhere useful
- private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
+ private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody)
{
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- null,
- false);
+ final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, null, false);
try
{
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
- publishBody,
- new ContentHeaderBody()
- {
- public int getSize()
- {
- return 1;
- }
- });
+ //Safe to use a null StoreContext as we have created a TransientMessage (see false param above)
+ amqMessage.setPublishAndContentHeaderBody( null, publishBody, new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
}
catch (AMQException e)
{
@@ -361,18 +364,18 @@ public class SimpleAMQQueueTest extends TestCase
}
- return amqMessageHandle;
+ return amqMessage;
}
- public class TestMessage extends AMQMessage
+ public class TestMessage extends TransientAMQMessage
{
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody)
throws AMQException
{
- super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
+ super(createMessage(messageId, publishBody));
_tag = tag;
}
@@ -396,7 +399,8 @@ public class SimpleAMQQueueTest extends TestCase
protected AMQMessage createMessage(Long id) throws AMQException
{
- AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
+
+ AMQMessage messageA = new TestMessage(id, id, info);
return messageA;
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
new file mode 100644
index 0000000000..e37269526c
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
@@ -0,0 +1,467 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TransientMessageTest extends TestCase
+{
+ AMQMessage _message;
+ StoreContext _storeContext = null;
+
+ protected AMQMessage newMessage(Long id)
+ {
+ return new MessageFactory().createMessage(id, null, false);
+ }
+
+ public void testMessageID()
+ {
+ Long id = 1L;
+ _message = newMessage(id);
+
+ assertEquals("Message not set value", id, _message.getMessageId());
+ }
+
+ public void testInvalidContentChunk()
+ {
+ _message = newMessage(1L);
+
+ try
+ {
+ _message.getContentChunk(0);
+ fail("getContentChunk should not succeed");
+ }
+ catch (RuntimeException e)
+ {
+ assertTrue(e.getMessage().equals("No ContentBody has been set"));
+ }
+
+ ContentChunk cc = new MockContentChunk(100);
+
+ try
+ {
+ _message.addContentBodyFrame(_storeContext, cc, false);
+ }
+ catch (AMQException e)
+ {
+ fail("AMQException thrown:" + e.getMessage());
+ }
+
+ try
+ {
+ _message.getContentChunk(-1);
+ fail("getContentChunk should not succeed");
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertTrue(e.getMessage().contains("out of valid range"));
+ }
+
+ try
+ {
+ _message.getContentChunk(1);
+ fail("getContentChunk should not succeed");
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertTrue(e.getMessage().contains("out of valid range"));
+ }
+ }
+
+ public void testAddSingleContentChunk()
+ {
+
+ _message = newMessage(1L);
+
+ ContentChunk cc = new MockContentChunk(100);
+
+ try
+ {
+ _message.addContentBodyFrame(_storeContext, cc, true);
+ }
+ catch (AMQException e)
+ {
+ fail("AMQException thrown:" + e.getMessage());
+ }
+
+ assertEquals("Incorrect body count", 1, _message.getBodyCount());
+
+ assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(0));
+
+ cc = new MockContentChunk(100);
+
+ try
+ {
+ _message.addContentBodyFrame(_storeContext, cc, true);
+ fail("Exception should prevent adding two final chunks");
+ }
+ catch (UnsupportedOperationException e)
+ {
+ //normal path
+ }
+ catch (AMQException e)
+ {
+ fail("AMQException thrown:" + e.getMessage());
+ }
+
+ }
+
+ public void testAddMultipleContentChunk()
+ {
+
+ _message = newMessage(1L);
+
+ ContentChunk cc = new MockContentChunk(100);
+
+ try
+ {
+ _message.addContentBodyFrame(_storeContext, cc, false);
+ }
+ catch (AMQException e)
+ {
+ fail("AMQException thrown:" + e.getMessage());
+ }
+
+ assertEquals("Incorrect body count", 1, _message.getBodyCount());
+
+ assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(0));
+
+ cc = new MockContentChunk(100);
+
+ try
+ {
+ _message.addContentBodyFrame(_storeContext, cc, true);
+ }
+ catch (AMQException e)
+ {
+ fail("AMQException thrown:" + e.getMessage());
+ }
+
+ assertEquals("Incorrect body count", 2, _message.getBodyCount());
+
+ assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(1));
+
+ }
+
+ public void testInitialArrivalTime()
+ {
+ _message = newMessage(1L);
+
+ assertEquals("Initial Arrival time should be 0L", 0L, _message.getArrivalTime());
+ }
+
+ public void testSetPublishAndContentHeaderBody_WithBody()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+ int bodySize = 100;
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, new BasicContentHeaderProperties(), bodySize);
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertEquals("BodySize not returned correctly. ", bodySize, _message.getSize());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testSetPublishAndContentHeaderBody_Null()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, null);
+ fail("setPublishAndContentHeaderBody with null ContentHeaederBody did not throw NPE.");
+ }
+ catch (NullPointerException npe)
+ {
+ assertEquals("HeaderBody cannot be null", npe.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail("setPublishAndContentHeaderBody should not throw AMQException here:" + e.getMessage());
+ }
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, null, chb);
+ fail("setPublishAndContentHeaderBody with null MessagePublishInfo did not throw NPE.");
+ }
+ catch (NullPointerException npe)
+ {
+ assertEquals("PublishInfo cannot be null", npe.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail("setPublishAndContentHeaderBody should not throw AMQException here:" + e.getMessage());
+ }
+ }
+
+ public void testSetPublishAndContentHeaderBody_Empty()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertEquals("BodySize not returned correctly. ", bodySize, _message.getSize());
+
+ ContentHeaderBody retreived_chb = _message.getContentHeaderBody();
+
+ ContentHeaderProperties chp = retreived_chb.properties;
+
+ assertEquals("ContentHeaderBody not correct", chb, retreived_chb);
+
+ assertEquals("AppID not correctly retreived", "HandleTest",
+ ((BasicContentHeaderProperties) chp).getAppIdAsString());
+
+ MessagePublishInfo retreived_mpi = _message.getMessagePublishInfo();
+
+ assertEquals("MessagePublishInfo not correct", mpi, retreived_mpi);
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testIsPersistent()
+ {
+ _message = newMessage(1L);
+
+ assertFalse(_message.isPersistent());
+ }
+
+ public void testImmediateAndNotDelivered()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertTrue("Undelivered Immediate message should still be marked as so", _message.immediateAndNotDelivered());
+
+ assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
+
+ _message.setDeliveredToConsumer();
+
+ assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
+
+ assertFalse("Delivered Immediate message now be marked as so", _message.immediateAndNotDelivered());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testNotImmediateAndNotDelivered()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertFalse("Undelivered Non-Immediate message should not result in true.", _message.immediateAndNotDelivered());
+
+ assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
+
+ _message.setDeliveredToConsumer();
+
+ assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
+
+ assertFalse("Delivered Non-Immediate message not change this return", _message.immediateAndNotDelivered());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testExpiry()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ try
+ {
+ _message.setExpiration(System.currentTimeMillis() + 10L);
+
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertFalse("New messages should not be expired.", _message.expired());
+
+ final long MILLIS =1000000L;
+ long waitTime = 20 * MILLIS;
+
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ assertTrue("After a sleep messages should now be expired.", _message.expired());
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+
+ public void testNoExpiry()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ try
+ {
+
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertFalse("New messages should not be expired.", _message.expired());
+
+ final long MILLIS =1000000L;
+ long waitTime = 10 * MILLIS;
+
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ assertFalse("After a sleep messages without an expiry should not expire.", _message.expired());
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 12ed928e7f..b4ed1f8709 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -30,7 +30,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -389,7 +389,7 @@ public class MessageStoreTest extends TestCase
try
{
- currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory());
+ currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageFactory());
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 51820f72dd..9a9fe3644c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -26,9 +26,8 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.AMQMessageHandle;
/**
* Tests that reference counting works correctly with AMQMessage and the message store
@@ -56,10 +55,9 @@ public class TestReferenceCounting extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
final long messageId = _store.getNewMessageId();
- AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
- messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb);
- AMQMessage message = new AMQMessage(messageHandle,
- _storeContext,info);
+
+ AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true);
+ message.setPublishAndContentHeaderBody(_storeContext, info, chb);
message = message.takeReference();
@@ -88,18 +86,10 @@ public class TestReferenceCounting extends TestCase
final Long messageId = _store.getNewMessageId();
final ContentHeaderBody chb = createPersistentContentHeader();
- AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
- messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb);
- AMQMessage message = new AMQMessage(messageHandle,
- _storeContext,
- info);
-
+ AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true);
+ message.setPublishAndContentHeaderBody(_storeContext, info, chb);
message = message.takeReference();
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
-
-
assertEquals(1, _store.getMessageMetaDataMap().size());
message = message.takeReference();