summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-19 13:26:19 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-19 13:26:19 +0000
commit4b1cc6b00ded3584ed2f11431845de09f195ed14 (patch)
tree7819b441fe93744d93b32d0129aa6c350d0c9367
parent221e46668fe6e1830153e66cef3d0e9a7f8d8477 (diff)
downloadqpid-python-4b1cc6b00ded3584ed2f11431845de09f195ed14.tar.gz
Refactoring perf. tweaks
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@657827 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java59
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java13
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java5
10 files changed, 130 insertions, 78 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index c38e65fb0c..0a6bfb15e6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -113,6 +113,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
+ private ProtocolSessionIdentifier _sessionIdentifier;
public ManagedObject getManagedObject()
{
@@ -702,6 +703,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
}
}
+ _sessionIdentifier = new ProtocolSessionIdentifier(this);
}
private void setProtocolVersion(ProtocolVersion pv)
@@ -789,6 +791,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _dispatcher;
}
+ public ProtocolSessionIdentifier getSessionIdentifier()
+ {
+ return _sessionIdentifier;
+ }
+
public String getClientVersion()
{
return (_clientVersion == null) ? null : _clientVersion.toString();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index c9316f7405..c3400029da 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol;
import javax.security.sasl.SaslServer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
@@ -35,7 +36,27 @@ import java.security.Principal;
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
{
-
+ public static final class ProtocolSessionIdentifier
+ {
+ private final Object _sessionIdentifier;
+ private final Object _sessionInstance;
+
+ ProtocolSessionIdentifier(AMQProtocolSession session)
+ {
+ _sessionIdentifier = session.getClientIdentifier();
+ _sessionInstance = session.getClientProperties() == null ? null : session.getClientProperties().getObject(ClientProperties.instance.toAMQShortString());
+ }
+
+ public Object getSessionIdentifier()
+ {
+ return _sessionIdentifier;
+ }
+
+ public Object getSessionInstance()
+ {
+ return _sessionInstance;
+ }
+ }
public static interface Task
{
@@ -175,5 +196,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
public MethodRegistry getMethodRegistry();
public MethodDispatcher getMethodDispatcher();
+
+ public ProtocolSessionIdentifier getSessionIdentifier();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 1df93dd0d8..05533e0d2d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -33,8 +33,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.exchange.Exchange;
-
+
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
@@ -54,21 +53,27 @@ public class AMQMessage implements Filterable<AMQException>
/** Holds the transactional context in which this message is being processed. */
private StoreContext _storeContext;
+ /** Flag to indicate that this message requires 'immediate' delivery. */
+
+ private static final byte IMMEDIATE = 0x01;
+
/**
* Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
* for messages published with the 'immediate' flag.
*/
- private boolean _deliveredToConsumer;
- /** Flag to indicate that this message requires 'immediate' delivery. */
- private boolean _immediate;
+ private static final byte DELIVERED_TO_CONSUMER = 0x02;
+
+ private byte _flags = 0;
private long _expiration;
- private Object _publisherClientInstance;
- private Object _publisherIdentifier;
private final long _size;
+ private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
+ private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
+
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
@@ -211,7 +216,11 @@ public class AMQMessage implements Filterable<AMQException>
{
_messageHandle = messageHandle;
_storeContext = storeConext;
- _immediate = info.isImmediate();
+
+ if(info.isImmediate())
+ {
+ _flags |= IMMEDIATE;
+ }
_size = messageHandle.getBodySize(storeConext);
}
@@ -221,7 +230,7 @@ public class AMQMessage implements Filterable<AMQException>
{
_messageHandle = msg._messageHandle;
_storeContext = msg._storeContext;
- _deliveredToConsumer = msg._deliveredToConsumer;
+ _flags = msg._flags;
_size = msg._size;
}
@@ -289,10 +298,7 @@ public class AMQMessage implements Filterable<AMQException>
{
return true;
}
- // if (_log.isDebugEnabled())
- // {
- // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
- // }
+
}
/**
@@ -322,11 +328,6 @@ public class AMQMessage implements Filterable<AMQException>
try
{
- // if (_log.isDebugEnabled())
- // {
- // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
- // }
-
// must check if the handle is null since there may be cases where we decide to throw away a message
// and the handle has not yet been constructed
if (_messageHandle != null)
@@ -359,12 +360,12 @@ public class AMQMessage implements Filterable<AMQException>
*/
public boolean getDeliveredToConsumer()
{
- return _deliveredToConsumer;
+ return (_flags & DELIVERED_TO_CONSUMER) != 0;
}
public boolean isPersistent() throws AMQException
{
- return _messageHandle.isPersistent(getStoreContext());
+ return _messageHandle.isPersistent();
}
/**
@@ -376,7 +377,7 @@ public class AMQMessage implements Filterable<AMQException>
public boolean immediateAndNotDelivered()
{
- return (_immediate && !_deliveredToConsumer);
+ return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
}
@@ -428,7 +429,7 @@ public class AMQMessage implements Filterable<AMQException>
*/
public void setDeliveredToConsumer()
{
- _deliveredToConsumer = true;
+ _flags |= DELIVERED_TO_CONSUMER;
}
@@ -444,26 +445,22 @@ public class AMQMessage implements Filterable<AMQException>
}
- public void setPublisherClientInstance(final Object publisherClientInstance)
- {
- _publisherClientInstance = publisherClientInstance;
- }
-
public Object getPublisherClientInstance()
{
- return _publisherClientInstance;
+ return _sessionIdentifier.getSessionInstance();
}
public Object getPublisherIdentifier()
{
- return _publisherIdentifier;
+ return _sessionIdentifier.getSessionIdentifier();
}
- public void setPublisherIdentifier(final Object publisherIdentifier)
+ public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
{
- _publisherIdentifier = publisherIdentifier;
+ _sessionIdentifier = sessionIdentifier;
}
+
public String toString()
{
// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
index a0db4ba833..0ddd4e4d92 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -67,7 +67,7 @@ public interface AMQMessageHandle
void setRedelivered(boolean redelivered);
- boolean isPersistent(StoreContext context) throws AMQException;
+ boolean isPersistent();
void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
ContentHeaderBody contentHeaderBody)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index 8ef28fbcd2..ba6b392d13 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.AMQException;
public class AMQPriorityQueue extends SimpleAMQQueue
@@ -37,5 +39,29 @@ public class AMQPriorityQueue extends SimpleAMQQueue
super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
}
+ @Override
+ protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ {
+ // check that all subscriptions are not in advance of the entry
+ SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
+ while(subIter.advance() && !entry.isAcquired())
+ {
+ final Subscription subscription = subIter.getNode().getSubscription();
+ QueueEntry subnode = subscription.getLastSeenEntry();
+ while((entry.compareTo(subnode) < 0) && !entry.isAcquired())
+ {
+ if(subscription.setLastSeenEntry(subnode,entry))
+ {
+ break;
+ }
+ else
+ {
+ subnode = subscription.getLastSeenEntry();
+ }
+ }
+
+ }
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
index e7a99ac668..35ad5be4e0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
@@ -22,10 +22,10 @@ package org.apache.qpid.server.queue;
import java.util.LinkedList;
import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -40,7 +40,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle
private MessagePublishInfo _messagePublishInfo;
- private List<ContentChunk> _contentBodies = new LinkedList<ContentChunk>();
+ private List<ContentChunk> _contentBodies;
private boolean _redelivered;
@@ -86,7 +86,22 @@ public class InMemoryMessageHandle implements AMQMessageHandle
public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentBody, boolean isLastContentBody)
throws AMQException
{
- _contentBodies.add(contentBody);
+ if(_contentBodies == null)
+ {
+ if(isLastContentBody)
+ {
+ _contentBodies = Collections.singletonList(contentBody);
+ }
+ else
+ {
+ _contentBodies = new ArrayList<ContentChunk>();
+ _contentBodies.add(contentBody);
+ }
+ }
+ else
+ {
+ _contentBodies.add(contentBody);
+ }
}
public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException
@@ -105,12 +120,9 @@ public class InMemoryMessageHandle implements AMQMessageHandle
_redelivered = redelivered;
}
- public boolean isPersistent(StoreContext context) throws AMQException
+ public boolean isPersistent()
{
- //todo remove literal values to a constant file such as AMQConstants in common
- ContentHeaderBody chb = getContentHeaderBody(context);
- return chb.properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+ return false;
}
/**
@@ -125,6 +137,10 @@ public class InMemoryMessageHandle implements AMQMessageHandle
{
_messagePublishInfo = messagePublishInfo;
_contentHeaderBody = contentHeaderBody;
+ if(contentHeaderBody.bodySize == 0)
+ {
+ _contentBodies = Collections.EMPTY_LIST;
+ }
_arrivalTime = System.currentTimeMillis();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index dcae821604..68b429efc6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -176,13 +176,10 @@ public class IncomingMessage implements Filterable<RuntimeException>
message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
- message.setPublisherIdentifier(_publisher.getClientIdentifier());
+
message.setExpiration(_expiration);
+ message.setClientIdentifier(_publisher.getSessionIdentifier());
- if (_publisher.getClientProperties() != null)
- {
- message.setPublisherClientInstance(_publisher.getClientProperties().getObject(ClientProperties.instance.toAMQShortString()));
- }
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a3709641f0..16d24e74ee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -81,7 +81,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final AtomicBoolean _quiesced = new AtomicBoolean(false);
- private final SubscriptionList _subscriptionList = new SubscriptionList(this);
+ protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
private boolean _exclusiveSubscriber;
@@ -389,13 +389,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
else
{
- deliverMessage(sub, entry);
QueueEntry queueEntryNode = sub.getLastSeenEntry();
if(_entries.next(queueEntryNode) == entry)
{
sub.setLastSeenEntry(queueEntryNode,entry);
}
+ deliverMessage(sub, entry);
+
}
}
}
@@ -414,25 +415,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
else if(!entry.isAcquired())
{
- // check that all subscriptions are not in advance of the entry
- SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
- while(subIter.advance() && !entry.isAcquired())
- {
- final Subscription subscription = subIter.getNode().getSubscription();
- QueueEntry subnode = subscription.getLastSeenEntry();
- while((entry.compareTo(subnode) < 0) && !entry.isAcquired())
- {
- if(subscription.setLastSeenEntry(subnode,entry))
- {
- break;
- }
- else
- {
- subnode = subscription.getLastSeenEntry();
- }
- }
+ checkSubscriptionsNotAheadOfDelivery(entry);
- }
deliverAsync();
}
@@ -451,6 +435,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ {
+ // This method is only required for queues which mess with ordering
+ }
+
private void incrementQueueSize(final AMQMessage message)
{
getAtomicQueueSize().addAndGet(message.getSize());
@@ -1204,6 +1193,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throws AMQException
{
QueueEntry node = sub.getLastSeenEntry();
+
while(node.isAcquired() || node.isDeleted() || node.expired())
{
if(!node.isAcquired() && !node.isDeleted() && node.expired())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
index fb70984d99..3ed8b0e55c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -26,7 +26,6 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -50,8 +49,6 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
private final Long _messageId;
private long _arrivalTime;
- private boolean _persistent;
-
public WeakReferenceMessageHandle(final Long messageId, MessageStore messageStore)
{
@@ -175,9 +172,9 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
_redelivered = redelivered;
}
- public boolean isPersistent(StoreContext context) throws AMQException
+ public boolean isPersistent()
{
- return _persistent;
+ return true;
}
/**
@@ -203,12 +200,6 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies.size(), arrivalTime);
-
-
-
- _persistent = contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) contentHeaderBody.properties).getDeliveryMode() == 2;
-
_messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index cf986e7803..a1a405c313 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -215,6 +215,11 @@ public class MockProtocolSession implements AMQProtocolSession
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public ProtocolSessionIdentifier getSessionIdentifier()
+ {
+ return null;
+ }
+
public byte getProtocolMajorVersion()
{
return getProtocolVersion().getMajorVersion();