diff options
author | Martin Ritchie <ritchiem@apache.org> | 2009-02-20 14:48:28 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2009-02-20 14:48:28 +0000 |
commit | 0db29114e114d6073494de791785df237cfda475 (patch) | |
tree | 55ba09e99625a2563148ef6989dbe5192c590535 | |
parent | f1e96b7aa7db5ce3aa006d489ffbf16db0e9541c (diff) | |
download | qpid-python-0db29114e114d6073494de791785df237cfda475.tar.gz |
QPID-1632 - Initial testing of reference counting and implemntation of TransactionLog based reference counting
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@746259 13f79535-47bb-0310-9956-ffa450edef68
20 files changed, 168 insertions, 267 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index bb7331e5f7..125518358b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -251,7 +251,7 @@ public class AMQChannel } catch (NoRouteException e) { - //_currentMessage.incrementReference(); + //_currentMessage.takeReference(); _returnMessages.add(e); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 3f1947d65a..415f1fe8be 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -61,7 +61,8 @@ public abstract class RequiredDeliveryException extends AMQException // and so will have the ref decremented as routing fails. // we need to keep this message around so we can return it in the // handler. So increment here. - _amqMessage = payload.takeReference(); + payload.incrementReference(1); + _amqMessage = payload; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index db3a05eb52..0f40e00624 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -130,7 +130,7 @@ public class TxAck implements TxnOp //in memory (persistent changes will be rolled back by store) for (QueueEntry msg : _unacked.values()) { - msg.getMessage().takeReference(); + msg.getMessage().incrementReference(1); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 2bd6e612f8..1f56b2ccd2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -126,9 +126,5 @@ public interface AMQMessage boolean incrementReference(int queueCount); - boolean incrementReference(); - - AMQMessage takeReference(); - boolean isReferenced(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index f7ae04c528..5d4322c4fc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -78,6 +78,10 @@ public class IncomingMessage implements Filterable<RuntimeException> final AMQProtocolSession publisher, TransactionLog messasgeStore) { + if (publisher == null) + { + throw new NullPointerException("Message Publisher cannot be null"); + } _messagePublishInfo = info; _txnContext = txnContext; _publisher = publisher; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java index e5e0b6e312..9924733178 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java @@ -42,19 +42,19 @@ public class MessageFactory _messageId = new AtomicLong(0L); } - public void start() + public void recoveryComplete() { _state = State.OPEN; } /** - * Only used by test as test suite is run in a single VM we need to beable to re-enable recovery mode. - */ - protected void enableRecover() + * Only to be used by tests as this will cause violate the principal that message IDs should not be reused. + */ + public void reset() { _state = State.RECOVER; + _messageId = new AtomicLong(0L); } - /** * Normal message creation path diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java index 804bb29ecd..ec48a2afb0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java @@ -60,6 +60,7 @@ public class PersistentAMQMessage extends TransientAMQMessage @Override public void removeMessage(StoreContext storeContext) throws AMQException { + _log.info("PAMQM : removing message:" + _messageId); _transactionLog.removeMessage(storeContext, _messageId); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index ba14be5580..3eb1636884 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -301,10 +301,16 @@ public class QueueEntryImpl implements QueueEntry public void dispose(final StoreContext storeContext) throws MessageCleanupException { + _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state); if(delete()) { + _log.info("QEI delete message:" + getMessage().getMessageId()); getMessage().decrementReference(storeContext); } + else + { + _log.info("QEI delete state wrong:" + getMessage().getMessageId()); + } } public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 42f52ce730..a0f21033c7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -918,7 +918,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (!entry.isDeleted()) { - return entry.getMessage().incrementReference(); + return entry.getMessage().incrementReference(1); } } @@ -1418,7 +1418,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - @Override + public void checkMessageStatus() throws AMQException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java index 8c62e046f8..f3d74fb01c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java @@ -32,19 +32,17 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Collections; -import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; -/** - * A deliverable message. - */ +/** A deliverable message. */ public class TransientAMQMessage implements AMQMessage { /** Used for debugging purposes. */ - private static final Logger _log = Logger.getLogger(AMQMessage.class); + protected static final Logger _log = Logger.getLogger(AMQMessage.class); private final AtomicInteger _referenceCount = new AtomicInteger(1); @@ -58,8 +56,6 @@ public class TransientAMQMessage implements AMQMessage protected final Long _messageId; - - /** Flag to indicate that this message requires 'immediate' delivery. */ private static final byte IMMEDIATE = 0x01; @@ -143,23 +139,23 @@ public class TransientAMQMessage implements AMQMessage /** * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message * These all need refactoring to some sort of MockAMQMessageFactory. - */ + */ @Deprecated protected TransientAMQMessage(AMQMessage message) throws AMQException { _messageId = message.getMessageId(); - _flags = ((TransientAMQMessage)message)._flags; + _flags = ((TransientAMQMessage) message)._flags; _contentHeaderBody = message.getContentHeaderBody(); _messagePublishInfo = message.getMessagePublishInfo(); } - /** * Normal message creation via the MessageFactory uses this constructor * Package scope limited as MessageFactory should be used - * @see MessageFactory * * @param messageId + * + * @see MessageFactory */ TransientAMQMessage(Long messageId) { @@ -191,7 +187,6 @@ public class TransientAMQMessage implements AMQMessage return new BodyContentIterator(); } - public ContentHeaderBody getContentHeaderBody() { return _contentHeaderBody; @@ -202,32 +197,19 @@ public class TransientAMQMessage implements AMQMessage return _messageId; } - /** - * Creates a long-lived reference to this message, and increments the count of such references, as an atomic - * operation. - */ - public AMQMessage takeReference() - { - incrementReference(); // _referenceCount.incrementAndGet(); - - return this; - } - - public boolean incrementReference() - { - return incrementReference(1); - } - /* Threadsafe. Increment the reference count on the message. */ public boolean incrementReference(int count) { - if(_referenceCount.addAndGet(count) <= 1) + if (_referenceCount.addAndGet(count) <= 1) { - _referenceCount.addAndGet(-count); + int newcount = _referenceCount.addAndGet(-count); + _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount); return false; } else { + _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1(" + + _referenceCount.get() + ")"); return true; } @@ -247,6 +229,8 @@ public class TransientAMQMessage implements AMQMessage int count = _referenceCount.decrementAndGet(); + _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count); + // note that the operation of decrementing the reference count and then removing the message does not // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after // the message has been passed to all queues. i.e. we are @@ -256,10 +240,11 @@ public class TransientAMQMessage implements AMQMessage // set the reference count way below 0 so that we can detect that the message has been deleted // this is to guard against the message being spontaneously recreated (from the mgmt console) // by copying from other queues at the same time as it is being removed. - _referenceCount.set(Integer.MIN_VALUE/2); + _referenceCount.set(Integer.MIN_VALUE / 2); try { + _log.debug("Reference Count hit 0, removing message"); // must check if the handle is null since there may be cases where we decide to throw away a message // and the handle has not yet been constructed // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op @@ -268,7 +253,7 @@ public class TransientAMQMessage implements AMQMessage catch (AMQException e) { // to maintain consistency, we revert the count - incrementReference(); + incrementReference(1); throw new MessageCleanupException(getMessageId(), e); } } @@ -282,7 +267,6 @@ public class TransientAMQMessage implements AMQMessage } } - /** * Called selectors to determin if the message has already been sent * @@ -296,10 +280,10 @@ public class TransientAMQMessage implements AMQMessage /** * Called to enforce the 'immediate' flag. * - * @returns true if the message is marked for immediate delivery but has not been marked as delivered - * to a consumer + * @returns true if the message is marked for immediate delivery but has not been marked as delivered + * to a consumer */ - public boolean immediateAndNotDelivered() + public boolean immediateAndNotDelivered() { return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; @@ -335,7 +319,6 @@ public class TransientAMQMessage implements AMQMessage _flags |= DELIVERED_TO_CONSUMER; } - public long getSize() { return _contentHeaderBody.bodySize; @@ -345,7 +328,7 @@ public class TransientAMQMessage implements AMQMessage { return _sessionIdentifier.getSessionInstance(); } - + public Object getPublisherIdentifier() { return _sessionIdentifier.getSessionIdentifier(); @@ -356,7 +339,7 @@ public class TransientAMQMessage implements AMQMessage _sessionIdentifier = sessionIdentifier; } - /** From AMQMessageHandle **/ + /** From AMQMessageHandle * */ public int getBodyCount() { @@ -365,7 +348,7 @@ public class TransientAMQMessage implements AMQMessage public ContentChunk getContentChunk(int index) { - if(_contentBodies == null) + if (_contentBodies == null) { throw new RuntimeException("No ContentBody has been set"); } @@ -381,9 +364,9 @@ public class TransientAMQMessage implements AMQMessage public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException { - if(_contentBodies == null) + if (_contentBodies == null) { - if(isLastContentBody) + if (isLastContentBody) { _contentBodies = Collections.singletonList(contentChunk); } @@ -411,9 +394,10 @@ public class TransientAMQMessage implements AMQMessage /** * This is called when all the content has been received. + * * @param storeContext - *@param messagePublishInfo - * @param contentHeaderBody @throws AMQException + * @param messagePublishInfo + * @param contentHeaderBody @throws AMQException */ public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody) @@ -425,7 +409,7 @@ public class TransientAMQMessage implements AMQMessage throw new NullPointerException("HeaderBody cannot be null"); } - if( messagePublishInfo == null) + if (messagePublishInfo == null) { throw new NullPointerException("PublishInfo cannot be null"); } @@ -433,15 +417,14 @@ public class TransientAMQMessage implements AMQMessage _messagePublishInfo = messagePublishInfo; _contentHeaderBody = contentHeaderBody; - - if( contentHeaderBody.bodySize == 0) + if (contentHeaderBody.bodySize == 0) { _contentBodies = Collections.EMPTY_LIST; - } + } _arrivalTime = System.currentTimeMillis(); - if(messagePublishInfo.isImmediate()) + if (messagePublishInfo.isImmediate()) { _flags |= IMMEDIATE; } @@ -457,7 +440,6 @@ public class TransientAMQMessage implements AMQMessage //no-op } - public String toString() { // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 35d8d9ea6c..fe81346c8c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -1357,7 +1357,7 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable if(message != null) { - message.incrementReference(); + message.incrementReference(1); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index be11eb7b84..1c58e644e9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -594,6 +594,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage protected void sendToClient(final QueueEntry entry, final long deliveryTag) throws AMQException { + _logger.info("Sending Message(" + entry + ") DTag:" + deliveryTag + " to subscription:" + debugIdentity()); _deliveryMethod.deliverToClient(this,entry,deliveryTag); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 9bc2e98fe9..8e63b95f0d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -92,7 +92,7 @@ public class LocalTransactionalContext implements TransactionalContext public void process() throws AMQException { - _message.incrementReference(); + _message.incrementReference(1); try { QueueEntry entry = _queue.enqueue(getStoreContext(),_message); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 9a4a4c3286..1115ed723c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -238,6 +238,13 @@ public class VirtualHost implements Accessable " does not."); } _transactionLog = (TransactionLog) o; + + //Assign RoutingTable as old MessageStores converted to TransactionLog may require the _routingTable. + if (_transactionLog instanceof RoutingTable) + { + _routingTable = (RoutingTable)_transactionLog; + } + _transactionLog.configure(this, "store", config); } @@ -261,9 +268,9 @@ public class VirtualHost implements Accessable } else { - if (_transactionLog instanceof RoutingTable) + if (_routingTable == null) { - _routingTable = (RoutingTable)_transactionLog; + throw new RuntimeException("No Routing Table configured unable to startup."); } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index 8293240905..1d729a82a5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -242,9 +242,9 @@ public class TxAckTest extends TestCase } - public boolean incrementReference() + public boolean incrementReference(int count) { - _count++; + _count+=count; return true; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java index db0fc56303..a272da88ac 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java @@ -29,16 +29,15 @@ public class MessageFactoryRecoveryTest extends TestCase public void setUp() { _factory = MessageFactory.getInstance(); - + _factory.reset(); } public void test() { - AMQMessage message = _factory.createMessage(null, false); - - _factory.enableRecover(); - Long messasgeID = message.getMessageId(); + Long messasgeID = 1L; + //Create initial message + _factory.createMessage(messasgeID, null); try { @@ -67,7 +66,7 @@ public class MessageFactoryRecoveryTest extends TestCase messasgeID += 100; try { - message = _factory.createMessage(messasgeID, null); + AMQMessage message = _factory.createMessage(messasgeID, null); assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId()); } catch (Exception re) @@ -76,7 +75,7 @@ public class MessageFactoryRecoveryTest extends TestCase } // End the reovery process. - _factory.start(); + _factory.recoveryComplete(); //Check we cannot still create by id after ending recovery phase try @@ -96,7 +95,7 @@ public class MessageFactoryRecoveryTest extends TestCase try { - message = _factory.createMessage(null, false); + AMQMessage message = _factory.createMessage(null, false); assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId()); } catch (Exception re) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java new file mode 100644 index 0000000000..44e9851db7 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; + +public class MessageReferenceCountingTest extends TestCase +{ + AMQMessage _message; + + public void setUp() + { + _message = MessageFactory.getInstance().createMessage(null, false); + } + + public void testInitialState() + { + + assertTrue("New messages should have a reference", _message.isReferenced()); + } + + public void testIncrementReference() + { + assertTrue("Message should maintain Referenced state", _message.isReferenced()); + assertTrue("Incrementing should be allowed ",_message.incrementReference(1)); + assertTrue("Message should maintain Referenced state", _message.isReferenced()); + assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1)); + assertTrue("Message should maintain Referenced state", _message.isReferenced()); + assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2)); + assertTrue("Message should maintain Referenced state", _message.isReferenced()); + } + + public void testDecrementReference() + { + assertTrue("Message should maintain Referenced state", _message.isReferenced()); + try + { + _message.decrementReference(null); + } + catch (MessageCleanupException e) + { + fail("Decrement should be allowed:"+e.getMessage()); + } + + assertFalse("Message should not be Referenced state", _message.isReferenced()); + + try + { + _message.decrementReference(null); + fail("Decrement should not be allowed as we should have a ref count of 0"); + } + catch (MessageCleanupException e) + { + assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0")); + } + + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index ed7b2923e7..12ff91cdad 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -20,193 +20,22 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.subscription.Subscription; - -public class MockQueueEntry implements QueueEntry +public class MockQueueEntry extends QueueEntryImpl { + static SimpleQueueEntryList _defaultList = new SimpleQueueEntryList(new MockAMQQueue("MockQueueEntry_DefaultQueue")); - private AMQMessage _message; - private boolean _redelivered; - - public boolean acquire() - { - return false; - } - - public boolean acquire(Subscription sub) - { - return false; - } - - public boolean acquiredBySubscription() - { - return false; - } - - public void addStateChangeListener(StateChangeListener listener) - { - - } - - public String debugIdentity() - { - return null; - } - - public boolean delete() - { - return false; - } - - public void dequeue(StoreContext storeContext) throws FailedDequeueException - { - - } - - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException - { - - } - - public void dispose(StoreContext storeContext) throws MessageCleanupException - { - - } - - public boolean expired() throws AMQException - { - return false; - } - - public Subscription getDeliveredSubscription() - { - return null; - } - - public boolean getDeliveredToConsumer() - { - return false; - } - - public AMQMessage getMessage() - { - return _message; - } - - public AMQQueue getQueue() - { - return null; - } - - public long getSize() - { - return 0; - } - - public boolean immediateAndNotDelivered() - { - return false; - } - - public boolean isAcquired() - { - return false; - } - - public boolean isDeleted() - { - return false; - } - - - public boolean isQueueDeleted() - { - - return false; - } - - - public boolean isRejectedBy(Subscription subscription) - { - - return false; - } - - - public void reject() - { - - - } - - - public void reject(Subscription subscription) - { - - - } - - - public void release() - { - - - } - - - public boolean removeStateChangeListener(StateChangeListener listener) - { - - return false; - } - - - public void requeue(StoreContext storeContext) throws AMQException - { - - - } - - - public void setDeliveredToSubscription() - { - - - } - - - public void setRedelivered(boolean redelivered) - { - _redelivered = redelivered; - } - - - public int compareTo(QueueEntry o) - { - - return 0; - } - - public void setMessage(AMQMessage msg) - { - _message = msg; - } - - public ContentHeaderBody getContentHeaderBody() throws AMQException + public MockQueueEntry() { - return _message.getContentHeaderBody(); + super(_defaultList); } - public boolean isPersistent() throws AMQException + public MockQueueEntry(SimpleQueueEntryList queueEntryList, AMQMessage message) { - return _message.isPersistent(); + super(queueEntryList, message); } - public boolean isRedelivered() + public MockQueueEntry(AMQMessage message) { - return _redelivered; + super(_defaultList, message); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index f95b1eb83e..665ca089da 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -296,7 +296,7 @@ public class SimpleAMQQueueTest extends TestCase public void testGetLastFiveMessageIds() throws Exception { AMQMessage message = createMessage(); - Long messageIdOffset = message.getMessageId() -1 ; + Long messageIdOffset = message.getMessageId() - 1; for (int i = 0; i < 10; i++) { // Put message on queue @@ -335,7 +335,6 @@ public class SimpleAMQQueueTest extends TestCase msg.enqueue(qs); msg.routingComplete(_store); - _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1)); // Check that it is enqueued @@ -343,14 +342,13 @@ public class SimpleAMQQueueTest extends TestCase assertNotNull(data); // Dequeue message - MockQueueEntry entry = new MockQueueEntry(); ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store); message.setPublishAndContentHeaderBody(new StoreContext(), info, header); - entry.setMessage(message); + MockQueueEntry entry = new MockQueueEntry(message); _queue.dequeue(null, entry); // Check that it is dequeued @@ -408,9 +406,9 @@ public class SimpleAMQQueueTest extends TestCase _tag = getMessageId(); } - public boolean incrementReference() + public boolean incrementReference(int count) { - _count++; + _count+=count; return true; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index 48d69c5bad..e8acfc2fda 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -59,7 +59,7 @@ public class TestReferenceCounting extends TestCase AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - message = message.takeReference(); + message.incrementReference(1); // we call routing complete to set up the handle // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); @@ -89,10 +89,10 @@ public class TestReferenceCounting extends TestCase AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - message = message.takeReference(); + message.incrementReference(1); assertEquals(1, _store.getMessageMetaDataMap().size()); - message = message.takeReference(); + message.incrementReference(1); message.decrementReference(_storeContext); assertEquals(1, _store.getMessageMetaDataMap().size()); } |