diff options
author | Martin Ritchie <ritchiem@apache.org> | 2009-02-20 14:48:28 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2009-02-20 14:48:28 +0000 |
commit | e2a5f05f47c57dab2b78253a347d9e33e63720e6 (patch) | |
tree | 3ce550431927641672a0ce1066b1ea72411e24e2 /java/broker/src/main/java/org/apache/qpid | |
parent | af1cd20356d35061c74a3f1ba956faaf30933e90 (diff) | |
download | qpid-python-e2a5f05f47c57dab2b78253a347d9e33e63720e6.tar.gz |
QPID-1632 - Initial testing of reference counting and implemntation of TransactionLog based reference counting
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@746259 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid')
14 files changed, 67 insertions, 69 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index bb7331e5f7..125518358b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -251,7 +251,7 @@ public class AMQChannel } catch (NoRouteException e) { - //_currentMessage.incrementReference(); + //_currentMessage.takeReference(); _returnMessages.add(e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 3f1947d65a..415f1fe8be 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -61,7 +61,8 @@ public abstract class RequiredDeliveryException extends AMQException // and so will have the ref decremented as routing fails. // we need to keep this message around so we can return it in the // handler. So increment here. - _amqMessage = payload.takeReference(); + payload.incrementReference(1); + _amqMessage = payload; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index db3a05eb52..0f40e00624 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -130,7 +130,7 @@ public class TxAck implements TxnOp //in memory (persistent changes will be rolled back by store) for (QueueEntry msg : _unacked.values()) { - msg.getMessage().takeReference(); + msg.getMessage().incrementReference(1); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 2bd6e612f8..1f56b2ccd2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -126,9 +126,5 @@ public interface AMQMessage boolean incrementReference(int queueCount); - boolean incrementReference(); - - AMQMessage takeReference(); - boolean isReferenced(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index f7ae04c528..5d4322c4fc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -78,6 +78,10 @@ public class IncomingMessage implements Filterable<RuntimeException> final AMQProtocolSession publisher, TransactionLog messasgeStore) { + if (publisher == null) + { + throw new NullPointerException("Message Publisher cannot be null"); + } _messagePublishInfo = info; _txnContext = txnContext; _publisher = publisher; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java index e5e0b6e312..9924733178 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java @@ -42,19 +42,19 @@ public class MessageFactory _messageId = new AtomicLong(0L); } - public void start() + public void recoveryComplete() { _state = State.OPEN; } /** - * Only used by test as test suite is run in a single VM we need to beable to re-enable recovery mode. - */ - protected void enableRecover() + * Only to be used by tests as this will cause violate the principal that message IDs should not be reused. + */ + public void reset() { _state = State.RECOVER; + _messageId = new AtomicLong(0L); } - /** * Normal message creation path diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java index 804bb29ecd..ec48a2afb0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java @@ -60,6 +60,7 @@ public class PersistentAMQMessage extends TransientAMQMessage @Override public void removeMessage(StoreContext storeContext) throws AMQException { + _log.info("PAMQM : removing message:" + _messageId); _transactionLog.removeMessage(storeContext, _messageId); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index ba14be5580..3eb1636884 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -301,10 +301,16 @@ public class QueueEntryImpl implements QueueEntry public void dispose(final StoreContext storeContext) throws MessageCleanupException { + _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state); if(delete()) { + _log.info("QEI delete message:" + getMessage().getMessageId()); getMessage().decrementReference(storeContext); } + else + { + _log.info("QEI delete state wrong:" + getMessage().getMessageId()); + } } public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 42f52ce730..a0f21033c7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -918,7 +918,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (!entry.isDeleted()) { - return entry.getMessage().incrementReference(); + return entry.getMessage().incrementReference(1); } } @@ -1418,7 +1418,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - @Override + public void checkMessageStatus() throws AMQException { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java index 8c62e046f8..f3d74fb01c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java @@ -32,19 +32,17 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Collections; -import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; -/** - * A deliverable message. - */ +/** A deliverable message. */ public class TransientAMQMessage implements AMQMessage { /** Used for debugging purposes. */ - private static final Logger _log = Logger.getLogger(AMQMessage.class); + protected static final Logger _log = Logger.getLogger(AMQMessage.class); private final AtomicInteger _referenceCount = new AtomicInteger(1); @@ -58,8 +56,6 @@ public class TransientAMQMessage implements AMQMessage protected final Long _messageId; - - /** Flag to indicate that this message requires 'immediate' delivery. */ private static final byte IMMEDIATE = 0x01; @@ -143,23 +139,23 @@ public class TransientAMQMessage implements AMQMessage /** * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message * These all need refactoring to some sort of MockAMQMessageFactory. - */ + */ @Deprecated protected TransientAMQMessage(AMQMessage message) throws AMQException { _messageId = message.getMessageId(); - _flags = ((TransientAMQMessage)message)._flags; + _flags = ((TransientAMQMessage) message)._flags; _contentHeaderBody = message.getContentHeaderBody(); _messagePublishInfo = message.getMessagePublishInfo(); } - /** * Normal message creation via the MessageFactory uses this constructor * Package scope limited as MessageFactory should be used - * @see MessageFactory * * @param messageId + * + * @see MessageFactory */ TransientAMQMessage(Long messageId) { @@ -191,7 +187,6 @@ public class TransientAMQMessage implements AMQMessage return new BodyContentIterator(); } - public ContentHeaderBody getContentHeaderBody() { return _contentHeaderBody; @@ -202,32 +197,19 @@ public class TransientAMQMessage implements AMQMessage return _messageId; } - /** - * Creates a long-lived reference to this message, and increments the count of such references, as an atomic - * operation. - */ - public AMQMessage takeReference() - { - incrementReference(); // _referenceCount.incrementAndGet(); - - return this; - } - - public boolean incrementReference() - { - return incrementReference(1); - } - /* Threadsafe. Increment the reference count on the message. */ public boolean incrementReference(int count) { - if(_referenceCount.addAndGet(count) <= 1) + if (_referenceCount.addAndGet(count) <= 1) { - _referenceCount.addAndGet(-count); + int newcount = _referenceCount.addAndGet(-count); + _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount); return false; } else { + _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1(" + + _referenceCount.get() + ")"); return true; } @@ -247,6 +229,8 @@ public class TransientAMQMessage implements AMQMessage int count = _referenceCount.decrementAndGet(); + _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count); + // note that the operation of decrementing the reference count and then removing the message does not // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after // the message has been passed to all queues. i.e. we are @@ -256,10 +240,11 @@ public class TransientAMQMessage implements AMQMessage // set the reference count way below 0 so that we can detect that the message has been deleted // this is to guard against the message being spontaneously recreated (from the mgmt console) // by copying from other queues at the same time as it is being removed. - _referenceCount.set(Integer.MIN_VALUE/2); + _referenceCount.set(Integer.MIN_VALUE / 2); try { + _log.debug("Reference Count hit 0, removing message"); // must check if the handle is null since there may be cases where we decide to throw away a message // and the handle has not yet been constructed // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op @@ -268,7 +253,7 @@ public class TransientAMQMessage implements AMQMessage catch (AMQException e) { // to maintain consistency, we revert the count - incrementReference(); + incrementReference(1); throw new MessageCleanupException(getMessageId(), e); } } @@ -282,7 +267,6 @@ public class TransientAMQMessage implements AMQMessage } } - /** * Called selectors to determin if the message has already been sent * @@ -296,10 +280,10 @@ public class TransientAMQMessage implements AMQMessage /** * Called to enforce the 'immediate' flag. * - * @returns true if the message is marked for immediate delivery but has not been marked as delivered - * to a consumer + * @returns true if the message is marked for immediate delivery but has not been marked as delivered + * to a consumer */ - public boolean immediateAndNotDelivered() + public boolean immediateAndNotDelivered() { return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; @@ -335,7 +319,6 @@ public class TransientAMQMessage implements AMQMessage _flags |= DELIVERED_TO_CONSUMER; } - public long getSize() { return _contentHeaderBody.bodySize; @@ -345,7 +328,7 @@ public class TransientAMQMessage implements AMQMessage { return _sessionIdentifier.getSessionInstance(); } - + public Object getPublisherIdentifier() { return _sessionIdentifier.getSessionIdentifier(); @@ -356,7 +339,7 @@ public class TransientAMQMessage implements AMQMessage _sessionIdentifier = sessionIdentifier; } - /** From AMQMessageHandle **/ + /** From AMQMessageHandle * */ public int getBodyCount() { @@ -365,7 +348,7 @@ public class TransientAMQMessage implements AMQMessage public ContentChunk getContentChunk(int index) { - if(_contentBodies == null) + if (_contentBodies == null) { throw new RuntimeException("No ContentBody has been set"); } @@ -381,9 +364,9 @@ public class TransientAMQMessage implements AMQMessage public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException { - if(_contentBodies == null) + if (_contentBodies == null) { - if(isLastContentBody) + if (isLastContentBody) { _contentBodies = Collections.singletonList(contentChunk); } @@ -411,9 +394,10 @@ public class TransientAMQMessage implements AMQMessage /** * This is called when all the content has been received. + * * @param storeContext - *@param messagePublishInfo - * @param contentHeaderBody @throws AMQException + * @param messagePublishInfo + * @param contentHeaderBody @throws AMQException */ public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody) @@ -425,7 +409,7 @@ public class TransientAMQMessage implements AMQMessage throw new NullPointerException("HeaderBody cannot be null"); } - if( messagePublishInfo == null) + if (messagePublishInfo == null) { throw new NullPointerException("PublishInfo cannot be null"); } @@ -433,15 +417,14 @@ public class TransientAMQMessage implements AMQMessage _messagePublishInfo = messagePublishInfo; _contentHeaderBody = contentHeaderBody; - - if( contentHeaderBody.bodySize == 0) + if (contentHeaderBody.bodySize == 0) { _contentBodies = Collections.EMPTY_LIST; - } + } _arrivalTime = System.currentTimeMillis(); - if(messagePublishInfo.isImmediate()) + if (messagePublishInfo.isImmediate()) { _flags |= IMMEDIATE; } @@ -457,7 +440,6 @@ public class TransientAMQMessage implements AMQMessage //no-op } - public String toString() { // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 35d8d9ea6c..fe81346c8c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -1357,7 +1357,7 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable if(message != null) { - message.incrementReference(); + message.incrementReference(1); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index be11eb7b84..1c58e644e9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -594,6 +594,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage protected void sendToClient(final QueueEntry entry, final long deliveryTag) throws AMQException { + _logger.info("Sending Message(" + entry + ") DTag:" + deliveryTag + " to subscription:" + debugIdentity()); _deliveryMethod.deliverToClient(this,entry,deliveryTag); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 9bc2e98fe9..8e63b95f0d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -92,7 +92,7 @@ public class LocalTransactionalContext implements TransactionalContext public void process() throws AMQException { - _message.incrementReference(); + _message.incrementReference(1); try { QueueEntry entry = _queue.enqueue(getStoreContext(),_message); diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 9a4a4c3286..1115ed723c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -238,6 +238,13 @@ public class VirtualHost implements Accessable " does not."); } _transactionLog = (TransactionLog) o; + + //Assign RoutingTable as old MessageStores converted to TransactionLog may require the _routingTable. + if (_transactionLog instanceof RoutingTable) + { + _routingTable = (RoutingTable)_transactionLog; + } + _transactionLog.configure(this, "store", config); } @@ -261,9 +268,9 @@ public class VirtualHost implements Accessable } else { - if (_transactionLog instanceof RoutingTable) + if (_routingTable == null) { - _routingTable = (RoutingTable)_transactionLog; + throw new RuntimeException("No Routing Table configured unable to startup."); } } } |