summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-02-20 14:48:28 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-02-20 14:48:28 +0000
commite2a5f05f47c57dab2b78253a347d9e33e63720e6 (patch)
tree3ce550431927641672a0ce1066b1ea72411e24e2 /java/broker/src/main/java/org/apache/qpid
parentaf1cd20356d35061c74a3f1ba956faaf30933e90 (diff)
downloadqpid-python-e2a5f05f47c57dab2b78253a347d9e33e63720e6.tar.gz
QPID-1632 - Initial testing of reference counting and implemntation of TransactionLog based reference counting
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@746259 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java84
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java11
14 files changed, 67 insertions, 69 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index bb7331e5f7..125518358b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -251,7 +251,7 @@ public class AMQChannel
}
catch (NoRouteException e)
{
- //_currentMessage.incrementReference();
+ //_currentMessage.takeReference();
_returnMessages.add(e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 3f1947d65a..415f1fe8be 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -61,7 +61,8 @@ public abstract class RequiredDeliveryException extends AMQException
// and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
// handler. So increment here.
- _amqMessage = payload.takeReference();
+ payload.incrementReference(1);
+ _amqMessage = payload;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index db3a05eb52..0f40e00624 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -130,7 +130,7 @@ public class TxAck implements TxnOp
//in memory (persistent changes will be rolled back by store)
for (QueueEntry msg : _unacked.values())
{
- msg.getMessage().takeReference();
+ msg.getMessage().incrementReference(1);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 2bd6e612f8..1f56b2ccd2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -126,9 +126,5 @@ public interface AMQMessage
boolean incrementReference(int queueCount);
- boolean incrementReference();
-
- AMQMessage takeReference();
-
boolean isReferenced();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index f7ae04c528..5d4322c4fc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -78,6 +78,10 @@ public class IncomingMessage implements Filterable<RuntimeException>
final AMQProtocolSession publisher,
TransactionLog messasgeStore)
{
+ if (publisher == null)
+ {
+ throw new NullPointerException("Message Publisher cannot be null");
+ }
_messagePublishInfo = info;
_txnContext = txnContext;
_publisher = publisher;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
index e5e0b6e312..9924733178 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
@@ -42,19 +42,19 @@ public class MessageFactory
_messageId = new AtomicLong(0L);
}
- public void start()
+ public void recoveryComplete()
{
_state = State.OPEN;
}
/**
- * Only used by test as test suite is run in a single VM we need to beable to re-enable recovery mode.
- */
- protected void enableRecover()
+ * Only to be used by tests as this will cause violate the principal that message IDs should not be reused.
+ */
+ public void reset()
{
_state = State.RECOVER;
+ _messageId = new AtomicLong(0L);
}
-
/**
* Normal message creation path
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
index 804bb29ecd..ec48a2afb0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
@@ -60,6 +60,7 @@ public class PersistentAMQMessage extends TransientAMQMessage
@Override
public void removeMessage(StoreContext storeContext) throws AMQException
{
+ _log.info("PAMQM : removing message:" + _messageId);
_transactionLog.removeMessage(storeContext, _messageId);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ba14be5580..3eb1636884 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -301,10 +301,16 @@ public class QueueEntryImpl implements QueueEntry
public void dispose(final StoreContext storeContext) throws MessageCleanupException
{
+ _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state);
if(delete())
{
+ _log.info("QEI delete message:" + getMessage().getMessageId());
getMessage().decrementReference(storeContext);
}
+ else
+ {
+ _log.info("QEI delete state wrong:" + getMessage().getMessageId());
+ }
}
public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 42f52ce730..a0f21033c7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -918,7 +918,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (!entry.isDeleted())
{
- return entry.getMessage().incrementReference();
+ return entry.getMessage().incrementReference(1);
}
}
@@ -1418,7 +1418,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- @Override
+
public void checkMessageStatus() throws AMQException
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
index 8c62e046f8..f3d74fb01c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
@@ -32,19 +32,17 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-/**
- * A deliverable message.
- */
+/** A deliverable message. */
public class TransientAMQMessage implements AMQMessage
{
/** Used for debugging purposes. */
- private static final Logger _log = Logger.getLogger(AMQMessage.class);
+ protected static final Logger _log = Logger.getLogger(AMQMessage.class);
private final AtomicInteger _referenceCount = new AtomicInteger(1);
@@ -58,8 +56,6 @@ public class TransientAMQMessage implements AMQMessage
protected final Long _messageId;
-
-
/** Flag to indicate that this message requires 'immediate' delivery. */
private static final byte IMMEDIATE = 0x01;
@@ -143,23 +139,23 @@ public class TransientAMQMessage implements AMQMessage
/**
* Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message
* These all need refactoring to some sort of MockAMQMessageFactory.
- */
+ */
@Deprecated
protected TransientAMQMessage(AMQMessage message) throws AMQException
{
_messageId = message.getMessageId();
- _flags = ((TransientAMQMessage)message)._flags;
+ _flags = ((TransientAMQMessage) message)._flags;
_contentHeaderBody = message.getContentHeaderBody();
_messagePublishInfo = message.getMessagePublishInfo();
}
-
/**
* Normal message creation via the MessageFactory uses this constructor
* Package scope limited as MessageFactory should be used
- * @see MessageFactory
*
* @param messageId
+ *
+ * @see MessageFactory
*/
TransientAMQMessage(Long messageId)
{
@@ -191,7 +187,6 @@ public class TransientAMQMessage implements AMQMessage
return new BodyContentIterator();
}
-
public ContentHeaderBody getContentHeaderBody()
{
return _contentHeaderBody;
@@ -202,32 +197,19 @@ public class TransientAMQMessage implements AMQMessage
return _messageId;
}
- /**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
- * operation.
- */
- public AMQMessage takeReference()
- {
- incrementReference(); // _referenceCount.incrementAndGet();
-
- return this;
- }
-
- public boolean incrementReference()
- {
- return incrementReference(1);
- }
-
/* Threadsafe. Increment the reference count on the message. */
public boolean incrementReference(int count)
{
- if(_referenceCount.addAndGet(count) <= 1)
+ if (_referenceCount.addAndGet(count) <= 1)
{
- _referenceCount.addAndGet(-count);
+ int newcount = _referenceCount.addAndGet(-count);
+ _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount);
return false;
}
else
{
+ _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1("
+ + _referenceCount.get() + ")");
return true;
}
@@ -247,6 +229,8 @@ public class TransientAMQMessage implements AMQMessage
int count = _referenceCount.decrementAndGet();
+ _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count);
+
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
// the message has been passed to all queues. i.e. we are
@@ -256,10 +240,11 @@ public class TransientAMQMessage implements AMQMessage
// set the reference count way below 0 so that we can detect that the message has been deleted
// this is to guard against the message being spontaneously recreated (from the mgmt console)
// by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
+ _referenceCount.set(Integer.MIN_VALUE / 2);
try
{
+ _log.debug("Reference Count hit 0, removing message");
// must check if the handle is null since there may be cases where we decide to throw away a message
// and the handle has not yet been constructed
// no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
@@ -268,7 +253,7 @@ public class TransientAMQMessage implements AMQMessage
catch (AMQException e)
{
// to maintain consistency, we revert the count
- incrementReference();
+ incrementReference(1);
throw new MessageCleanupException(getMessageId(), e);
}
}
@@ -282,7 +267,6 @@ public class TransientAMQMessage implements AMQMessage
}
}
-
/**
* Called selectors to determin if the message has already been sent
*
@@ -296,10 +280,10 @@ public class TransientAMQMessage implements AMQMessage
/**
* Called to enforce the 'immediate' flag.
*
- * @returns true if the message is marked for immediate delivery but has not been marked as delivered
- * to a consumer
+ * @returns true if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
*/
- public boolean immediateAndNotDelivered()
+ public boolean immediateAndNotDelivered()
{
return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
@@ -335,7 +319,6 @@ public class TransientAMQMessage implements AMQMessage
_flags |= DELIVERED_TO_CONSUMER;
}
-
public long getSize()
{
return _contentHeaderBody.bodySize;
@@ -345,7 +328,7 @@ public class TransientAMQMessage implements AMQMessage
{
return _sessionIdentifier.getSessionInstance();
}
-
+
public Object getPublisherIdentifier()
{
return _sessionIdentifier.getSessionIdentifier();
@@ -356,7 +339,7 @@ public class TransientAMQMessage implements AMQMessage
_sessionIdentifier = sessionIdentifier;
}
- /** From AMQMessageHandle **/
+ /** From AMQMessageHandle * */
public int getBodyCount()
{
@@ -365,7 +348,7 @@ public class TransientAMQMessage implements AMQMessage
public ContentChunk getContentChunk(int index)
{
- if(_contentBodies == null)
+ if (_contentBodies == null)
{
throw new RuntimeException("No ContentBody has been set");
}
@@ -381,9 +364,9 @@ public class TransientAMQMessage implements AMQMessage
public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
throws AMQException
{
- if(_contentBodies == null)
+ if (_contentBodies == null)
{
- if(isLastContentBody)
+ if (isLastContentBody)
{
_contentBodies = Collections.singletonList(contentChunk);
}
@@ -411,9 +394,10 @@ public class TransientAMQMessage implements AMQMessage
/**
* This is called when all the content has been received.
+ *
* @param storeContext
- *@param messagePublishInfo
- * @param contentHeaderBody @throws AMQException
+ * @param messagePublishInfo
+ * @param contentHeaderBody @throws AMQException
*/
public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
ContentHeaderBody contentHeaderBody)
@@ -425,7 +409,7 @@ public class TransientAMQMessage implements AMQMessage
throw new NullPointerException("HeaderBody cannot be null");
}
- if( messagePublishInfo == null)
+ if (messagePublishInfo == null)
{
throw new NullPointerException("PublishInfo cannot be null");
}
@@ -433,15 +417,14 @@ public class TransientAMQMessage implements AMQMessage
_messagePublishInfo = messagePublishInfo;
_contentHeaderBody = contentHeaderBody;
-
- if( contentHeaderBody.bodySize == 0)
+ if (contentHeaderBody.bodySize == 0)
{
_contentBodies = Collections.EMPTY_LIST;
- }
+ }
_arrivalTime = System.currentTimeMillis();
- if(messagePublishInfo.isImmediate())
+ if (messagePublishInfo.isImmediate())
{
_flags |= IMMEDIATE;
}
@@ -457,7 +440,6 @@ public class TransientAMQMessage implements AMQMessage
//no-op
}
-
public String toString()
{
// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 35d8d9ea6c..fe81346c8c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -1357,7 +1357,7 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable
if(message != null)
{
- message.incrementReference();
+ message.incrementReference(1);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index be11eb7b84..1c58e644e9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -594,6 +594,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
protected void sendToClient(final QueueEntry entry, final long deliveryTag)
throws AMQException
{
+ _logger.info("Sending Message(" + entry + ") DTag:" + deliveryTag + " to subscription:" + debugIdentity());
_deliveryMethod.deliverToClient(this,entry,deliveryTag);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 9bc2e98fe9..8e63b95f0d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -92,7 +92,7 @@ public class LocalTransactionalContext implements TransactionalContext
public void process() throws AMQException
{
- _message.incrementReference();
+ _message.incrementReference(1);
try
{
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 9a4a4c3286..1115ed723c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -238,6 +238,13 @@ public class VirtualHost implements Accessable
" does not.");
}
_transactionLog = (TransactionLog) o;
+
+ //Assign RoutingTable as old MessageStores converted to TransactionLog may require the _routingTable.
+ if (_transactionLog instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable)_transactionLog;
+ }
+
_transactionLog.configure(this, "store", config);
}
@@ -261,9 +268,9 @@ public class VirtualHost implements Accessable
}
else
{
- if (_transactionLog instanceof RoutingTable)
+ if (_routingTable == null)
{
- _routingTable = (RoutingTable)_transactionLog;
+ throw new RuntimeException("No Routing Table configured unable to startup.");
}
}
}