summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-02-20 14:48:28 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-02-20 14:48:28 +0000
commit0db29114e114d6073494de791785df237cfda475 (patch)
tree55ba09e99625a2563148ef6989dbe5192c590535
parentf1e96b7aa7db5ce3aa006d489ffbf16db0e9541c (diff)
downloadqpid-python-0db29114e114d6073494de791785df237cfda475.tar.gz
QPID-1632 - Initial testing of reference counting and implemntation of TransactionLog based reference counting
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@746259 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java84
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java11
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java77
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java187
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java6
20 files changed, 168 insertions, 267 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index bb7331e5f7..125518358b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -251,7 +251,7 @@ public class AMQChannel
}
catch (NoRouteException e)
{
- //_currentMessage.incrementReference();
+ //_currentMessage.takeReference();
_returnMessages.add(e);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 3f1947d65a..415f1fe8be 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -61,7 +61,8 @@ public abstract class RequiredDeliveryException extends AMQException
// and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
// handler. So increment here.
- _amqMessage = payload.takeReference();
+ payload.incrementReference(1);
+ _amqMessage = payload;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index db3a05eb52..0f40e00624 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -130,7 +130,7 @@ public class TxAck implements TxnOp
//in memory (persistent changes will be rolled back by store)
for (QueueEntry msg : _unacked.values())
{
- msg.getMessage().takeReference();
+ msg.getMessage().incrementReference(1);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 2bd6e612f8..1f56b2ccd2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -126,9 +126,5 @@ public interface AMQMessage
boolean incrementReference(int queueCount);
- boolean incrementReference();
-
- AMQMessage takeReference();
-
boolean isReferenced();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index f7ae04c528..5d4322c4fc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -78,6 +78,10 @@ public class IncomingMessage implements Filterable<RuntimeException>
final AMQProtocolSession publisher,
TransactionLog messasgeStore)
{
+ if (publisher == null)
+ {
+ throw new NullPointerException("Message Publisher cannot be null");
+ }
_messagePublishInfo = info;
_txnContext = txnContext;
_publisher = publisher;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
index e5e0b6e312..9924733178 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
@@ -42,19 +42,19 @@ public class MessageFactory
_messageId = new AtomicLong(0L);
}
- public void start()
+ public void recoveryComplete()
{
_state = State.OPEN;
}
/**
- * Only used by test as test suite is run in a single VM we need to beable to re-enable recovery mode.
- */
- protected void enableRecover()
+ * Only to be used by tests as this will cause violate the principal that message IDs should not be reused.
+ */
+ public void reset()
{
_state = State.RECOVER;
+ _messageId = new AtomicLong(0L);
}
-
/**
* Normal message creation path
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
index 804bb29ecd..ec48a2afb0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
@@ -60,6 +60,7 @@ public class PersistentAMQMessage extends TransientAMQMessage
@Override
public void removeMessage(StoreContext storeContext) throws AMQException
{
+ _log.info("PAMQM : removing message:" + _messageId);
_transactionLog.removeMessage(storeContext, _messageId);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ba14be5580..3eb1636884 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -301,10 +301,16 @@ public class QueueEntryImpl implements QueueEntry
public void dispose(final StoreContext storeContext) throws MessageCleanupException
{
+ _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state);
if(delete())
{
+ _log.info("QEI delete message:" + getMessage().getMessageId());
getMessage().decrementReference(storeContext);
}
+ else
+ {
+ _log.info("QEI delete state wrong:" + getMessage().getMessageId());
+ }
}
public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 42f52ce730..a0f21033c7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -918,7 +918,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (!entry.isDeleted())
{
- return entry.getMessage().incrementReference();
+ return entry.getMessage().incrementReference(1);
}
}
@@ -1418,7 +1418,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- @Override
+
public void checkMessageStatus() throws AMQException
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
index 8c62e046f8..f3d74fb01c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
@@ -32,19 +32,17 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-/**
- * A deliverable message.
- */
+/** A deliverable message. */
public class TransientAMQMessage implements AMQMessage
{
/** Used for debugging purposes. */
- private static final Logger _log = Logger.getLogger(AMQMessage.class);
+ protected static final Logger _log = Logger.getLogger(AMQMessage.class);
private final AtomicInteger _referenceCount = new AtomicInteger(1);
@@ -58,8 +56,6 @@ public class TransientAMQMessage implements AMQMessage
protected final Long _messageId;
-
-
/** Flag to indicate that this message requires 'immediate' delivery. */
private static final byte IMMEDIATE = 0x01;
@@ -143,23 +139,23 @@ public class TransientAMQMessage implements AMQMessage
/**
* Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message
* These all need refactoring to some sort of MockAMQMessageFactory.
- */
+ */
@Deprecated
protected TransientAMQMessage(AMQMessage message) throws AMQException
{
_messageId = message.getMessageId();
- _flags = ((TransientAMQMessage)message)._flags;
+ _flags = ((TransientAMQMessage) message)._flags;
_contentHeaderBody = message.getContentHeaderBody();
_messagePublishInfo = message.getMessagePublishInfo();
}
-
/**
* Normal message creation via the MessageFactory uses this constructor
* Package scope limited as MessageFactory should be used
- * @see MessageFactory
*
* @param messageId
+ *
+ * @see MessageFactory
*/
TransientAMQMessage(Long messageId)
{
@@ -191,7 +187,6 @@ public class TransientAMQMessage implements AMQMessage
return new BodyContentIterator();
}
-
public ContentHeaderBody getContentHeaderBody()
{
return _contentHeaderBody;
@@ -202,32 +197,19 @@ public class TransientAMQMessage implements AMQMessage
return _messageId;
}
- /**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
- * operation.
- */
- public AMQMessage takeReference()
- {
- incrementReference(); // _referenceCount.incrementAndGet();
-
- return this;
- }
-
- public boolean incrementReference()
- {
- return incrementReference(1);
- }
-
/* Threadsafe. Increment the reference count on the message. */
public boolean incrementReference(int count)
{
- if(_referenceCount.addAndGet(count) <= 1)
+ if (_referenceCount.addAndGet(count) <= 1)
{
- _referenceCount.addAndGet(-count);
+ int newcount = _referenceCount.addAndGet(-count);
+ _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount);
return false;
}
else
{
+ _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1("
+ + _referenceCount.get() + ")");
return true;
}
@@ -247,6 +229,8 @@ public class TransientAMQMessage implements AMQMessage
int count = _referenceCount.decrementAndGet();
+ _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count);
+
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
// the message has been passed to all queues. i.e. we are
@@ -256,10 +240,11 @@ public class TransientAMQMessage implements AMQMessage
// set the reference count way below 0 so that we can detect that the message has been deleted
// this is to guard against the message being spontaneously recreated (from the mgmt console)
// by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
+ _referenceCount.set(Integer.MIN_VALUE / 2);
try
{
+ _log.debug("Reference Count hit 0, removing message");
// must check if the handle is null since there may be cases where we decide to throw away a message
// and the handle has not yet been constructed
// no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
@@ -268,7 +253,7 @@ public class TransientAMQMessage implements AMQMessage
catch (AMQException e)
{
// to maintain consistency, we revert the count
- incrementReference();
+ incrementReference(1);
throw new MessageCleanupException(getMessageId(), e);
}
}
@@ -282,7 +267,6 @@ public class TransientAMQMessage implements AMQMessage
}
}
-
/**
* Called selectors to determin if the message has already been sent
*
@@ -296,10 +280,10 @@ public class TransientAMQMessage implements AMQMessage
/**
* Called to enforce the 'immediate' flag.
*
- * @returns true if the message is marked for immediate delivery but has not been marked as delivered
- * to a consumer
+ * @returns true if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
*/
- public boolean immediateAndNotDelivered()
+ public boolean immediateAndNotDelivered()
{
return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
@@ -335,7 +319,6 @@ public class TransientAMQMessage implements AMQMessage
_flags |= DELIVERED_TO_CONSUMER;
}
-
public long getSize()
{
return _contentHeaderBody.bodySize;
@@ -345,7 +328,7 @@ public class TransientAMQMessage implements AMQMessage
{
return _sessionIdentifier.getSessionInstance();
}
-
+
public Object getPublisherIdentifier()
{
return _sessionIdentifier.getSessionIdentifier();
@@ -356,7 +339,7 @@ public class TransientAMQMessage implements AMQMessage
_sessionIdentifier = sessionIdentifier;
}
- /** From AMQMessageHandle **/
+ /** From AMQMessageHandle * */
public int getBodyCount()
{
@@ -365,7 +348,7 @@ public class TransientAMQMessage implements AMQMessage
public ContentChunk getContentChunk(int index)
{
- if(_contentBodies == null)
+ if (_contentBodies == null)
{
throw new RuntimeException("No ContentBody has been set");
}
@@ -381,9 +364,9 @@ public class TransientAMQMessage implements AMQMessage
public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
throws AMQException
{
- if(_contentBodies == null)
+ if (_contentBodies == null)
{
- if(isLastContentBody)
+ if (isLastContentBody)
{
_contentBodies = Collections.singletonList(contentChunk);
}
@@ -411,9 +394,10 @@ public class TransientAMQMessage implements AMQMessage
/**
* This is called when all the content has been received.
+ *
* @param storeContext
- *@param messagePublishInfo
- * @param contentHeaderBody @throws AMQException
+ * @param messagePublishInfo
+ * @param contentHeaderBody @throws AMQException
*/
public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
ContentHeaderBody contentHeaderBody)
@@ -425,7 +409,7 @@ public class TransientAMQMessage implements AMQMessage
throw new NullPointerException("HeaderBody cannot be null");
}
- if( messagePublishInfo == null)
+ if (messagePublishInfo == null)
{
throw new NullPointerException("PublishInfo cannot be null");
}
@@ -433,15 +417,14 @@ public class TransientAMQMessage implements AMQMessage
_messagePublishInfo = messagePublishInfo;
_contentHeaderBody = contentHeaderBody;
-
- if( contentHeaderBody.bodySize == 0)
+ if (contentHeaderBody.bodySize == 0)
{
_contentBodies = Collections.EMPTY_LIST;
- }
+ }
_arrivalTime = System.currentTimeMillis();
- if(messagePublishInfo.isImmediate())
+ if (messagePublishInfo.isImmediate())
{
_flags |= IMMEDIATE;
}
@@ -457,7 +440,6 @@ public class TransientAMQMessage implements AMQMessage
//no-op
}
-
public String toString()
{
// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 35d8d9ea6c..fe81346c8c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -1357,7 +1357,7 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable
if(message != null)
{
- message.incrementReference();
+ message.incrementReference(1);
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index be11eb7b84..1c58e644e9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -594,6 +594,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
protected void sendToClient(final QueueEntry entry, final long deliveryTag)
throws AMQException
{
+ _logger.info("Sending Message(" + entry + ") DTag:" + deliveryTag + " to subscription:" + debugIdentity());
_deliveryMethod.deliverToClient(this,entry,deliveryTag);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 9bc2e98fe9..8e63b95f0d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -92,7 +92,7 @@ public class LocalTransactionalContext implements TransactionalContext
public void process() throws AMQException
{
- _message.incrementReference();
+ _message.incrementReference(1);
try
{
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 9a4a4c3286..1115ed723c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -238,6 +238,13 @@ public class VirtualHost implements Accessable
" does not.");
}
_transactionLog = (TransactionLog) o;
+
+ //Assign RoutingTable as old MessageStores converted to TransactionLog may require the _routingTable.
+ if (_transactionLog instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable)_transactionLog;
+ }
+
_transactionLog.configure(this, "store", config);
}
@@ -261,9 +268,9 @@ public class VirtualHost implements Accessable
}
else
{
- if (_transactionLog instanceof RoutingTable)
+ if (_routingTable == null)
{
- _routingTable = (RoutingTable)_transactionLog;
+ throw new RuntimeException("No Routing Table configured unable to startup.");
}
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index 8293240905..1d729a82a5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -242,9 +242,9 @@ public class TxAckTest extends TestCase
}
- public boolean incrementReference()
+ public boolean incrementReference(int count)
{
- _count++;
+ _count+=count;
return true;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
index db0fc56303..a272da88ac 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
@@ -29,16 +29,15 @@ public class MessageFactoryRecoveryTest extends TestCase
public void setUp()
{
_factory = MessageFactory.getInstance();
-
+ _factory.reset();
}
public void test()
{
- AMQMessage message = _factory.createMessage(null, false);
-
- _factory.enableRecover();
- Long messasgeID = message.getMessageId();
+ Long messasgeID = 1L;
+ //Create initial message
+ _factory.createMessage(messasgeID, null);
try
{
@@ -67,7 +66,7 @@ public class MessageFactoryRecoveryTest extends TestCase
messasgeID += 100;
try
{
- message = _factory.createMessage(messasgeID, null);
+ AMQMessage message = _factory.createMessage(messasgeID, null);
assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
}
catch (Exception re)
@@ -76,7 +75,7 @@ public class MessageFactoryRecoveryTest extends TestCase
}
// End the reovery process.
- _factory.start();
+ _factory.recoveryComplete();
//Check we cannot still create by id after ending recovery phase
try
@@ -96,7 +95,7 @@ public class MessageFactoryRecoveryTest extends TestCase
try
{
- message = _factory.createMessage(null, false);
+ AMQMessage message = _factory.createMessage(null, false);
assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
}
catch (Exception re)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
new file mode 100644
index 0000000000..44e9851db7
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+
+public class MessageReferenceCountingTest extends TestCase
+{
+ AMQMessage _message;
+
+ public void setUp()
+ {
+ _message = MessageFactory.getInstance().createMessage(null, false);
+ }
+
+ public void testInitialState()
+ {
+
+ assertTrue("New messages should have a reference", _message.isReferenced());
+ }
+
+ public void testIncrementReference()
+ {
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ assertTrue("Incrementing should be allowed ",_message.incrementReference(1));
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1));
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2));
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ }
+
+ public void testDecrementReference()
+ {
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ try
+ {
+ _message.decrementReference(null);
+ }
+ catch (MessageCleanupException e)
+ {
+ fail("Decrement should be allowed:"+e.getMessage());
+ }
+
+ assertFalse("Message should not be Referenced state", _message.isReferenced());
+
+ try
+ {
+ _message.decrementReference(null);
+ fail("Decrement should not be allowed as we should have a ref count of 0");
+ }
+ catch (MessageCleanupException e)
+ {
+ assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0"));
+ }
+
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index ed7b2923e7..12ff91cdad 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -20,193 +20,22 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.subscription.Subscription;
-
-public class MockQueueEntry implements QueueEntry
+public class MockQueueEntry extends QueueEntryImpl
{
+ static SimpleQueueEntryList _defaultList = new SimpleQueueEntryList(new MockAMQQueue("MockQueueEntry_DefaultQueue"));
- private AMQMessage _message;
- private boolean _redelivered;
-
- public boolean acquire()
- {
- return false;
- }
-
- public boolean acquire(Subscription sub)
- {
- return false;
- }
-
- public boolean acquiredBySubscription()
- {
- return false;
- }
-
- public void addStateChangeListener(StateChangeListener listener)
- {
-
- }
-
- public String debugIdentity()
- {
- return null;
- }
-
- public boolean delete()
- {
- return false;
- }
-
- public void dequeue(StoreContext storeContext) throws FailedDequeueException
- {
-
- }
-
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
- {
-
- }
-
- public void dispose(StoreContext storeContext) throws MessageCleanupException
- {
-
- }
-
- public boolean expired() throws AMQException
- {
- return false;
- }
-
- public Subscription getDeliveredSubscription()
- {
- return null;
- }
-
- public boolean getDeliveredToConsumer()
- {
- return false;
- }
-
- public AMQMessage getMessage()
- {
- return _message;
- }
-
- public AMQQueue getQueue()
- {
- return null;
- }
-
- public long getSize()
- {
- return 0;
- }
-
- public boolean immediateAndNotDelivered()
- {
- return false;
- }
-
- public boolean isAcquired()
- {
- return false;
- }
-
- public boolean isDeleted()
- {
- return false;
- }
-
-
- public boolean isQueueDeleted()
- {
-
- return false;
- }
-
-
- public boolean isRejectedBy(Subscription subscription)
- {
-
- return false;
- }
-
-
- public void reject()
- {
-
-
- }
-
-
- public void reject(Subscription subscription)
- {
-
-
- }
-
-
- public void release()
- {
-
-
- }
-
-
- public boolean removeStateChangeListener(StateChangeListener listener)
- {
-
- return false;
- }
-
-
- public void requeue(StoreContext storeContext) throws AMQException
- {
-
-
- }
-
-
- public void setDeliveredToSubscription()
- {
-
-
- }
-
-
- public void setRedelivered(boolean redelivered)
- {
- _redelivered = redelivered;
- }
-
-
- public int compareTo(QueueEntry o)
- {
-
- return 0;
- }
-
- public void setMessage(AMQMessage msg)
- {
- _message = msg;
- }
-
- public ContentHeaderBody getContentHeaderBody() throws AMQException
+ public MockQueueEntry()
{
- return _message.getContentHeaderBody();
+ super(_defaultList);
}
- public boolean isPersistent() throws AMQException
+ public MockQueueEntry(SimpleQueueEntryList queueEntryList, AMQMessage message)
{
- return _message.isPersistent();
+ super(queueEntryList, message);
}
- public boolean isRedelivered()
+ public MockQueueEntry(AMQMessage message)
{
- return _redelivered;
+ super(_defaultList, message);
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index f95b1eb83e..665ca089da 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -296,7 +296,7 @@ public class SimpleAMQQueueTest extends TestCase
public void testGetLastFiveMessageIds() throws Exception
{
AMQMessage message = createMessage();
- Long messageIdOffset = message.getMessageId() -1 ;
+ Long messageIdOffset = message.getMessageId() - 1;
for (int i = 0; i < 10; i++)
{
// Put message on queue
@@ -335,7 +335,6 @@ public class SimpleAMQQueueTest extends TestCase
msg.enqueue(qs);
msg.routingComplete(_store);
-
_store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
@@ -343,14 +342,13 @@ public class SimpleAMQQueueTest extends TestCase
assertNotNull(data);
// Dequeue message
- MockQueueEntry entry = new MockQueueEntry();
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
- entry.setMessage(message);
+ MockQueueEntry entry = new MockQueueEntry(message);
_queue.dequeue(null, entry);
// Check that it is dequeued
@@ -408,9 +406,9 @@ public class SimpleAMQQueueTest extends TestCase
_tag = getMessageId();
}
- public boolean incrementReference()
+ public boolean incrementReference(int count)
{
- _count++;
+ _count+=count;
return true;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 48d69c5bad..e8acfc2fda 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -59,7 +59,7 @@ public class TestReferenceCounting extends TestCase
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message = message.takeReference();
+ message.incrementReference(1);
// we call routing complete to set up the handle
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
@@ -89,10 +89,10 @@ public class TestReferenceCounting extends TestCase
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message = message.takeReference();
+ message.incrementReference(1);
assertEquals(1, _store.getMessageMetaDataMap().size());
- message = message.takeReference();
+ message.incrementReference(1);
message.decrementReference(_storeContext);
assertEquals(1, _store.getMessageMetaDataMap().size());
}