diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-19 13:26:19 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-19 13:26:19 +0000 |
commit | 4b1cc6b00ded3584ed2f11431845de09f195ed14 (patch) | |
tree | 7819b441fe93744d93b32d0129aa6c350d0c9367 | |
parent | 221e46668fe6e1830153e66cef3d0e9a7f8d8477 (diff) | |
download | qpid-python-4b1cc6b00ded3584ed2f11431845de09f195ed14.tar.gz |
Refactoring perf. tweaks
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@657827 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 130 insertions, 78 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index c38e65fb0c..0a6bfb15e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -113,6 +113,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private ProtocolOutputConverter _protocolOutputConverter; private Principal _authorizedID; private MethodDispatcher _dispatcher; + private ProtocolSessionIdentifier _sessionIdentifier; public ManagedObject getManagedObject() { @@ -702,6 +703,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString())); } } + _sessionIdentifier = new ProtocolSessionIdentifier(this); } private void setProtocolVersion(ProtocolVersion pv) @@ -789,6 +791,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return _dispatcher; } + public ProtocolSessionIdentifier getSessionIdentifier() + { + return _sessionIdentifier; + } + public String getClientVersion() { return (_clientVersion == null) ? null : _clientVersion.toString(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index c9316f7405..c3400029da 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol; import javax.security.sasl.SaslServer; import org.apache.qpid.AMQException; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; @@ -35,7 +36,27 @@ import java.security.Principal; public interface AMQProtocolSession extends AMQVersionAwareProtocolSession { - + public static final class ProtocolSessionIdentifier + { + private final Object _sessionIdentifier; + private final Object _sessionInstance; + + ProtocolSessionIdentifier(AMQProtocolSession session) + { + _sessionIdentifier = session.getClientIdentifier(); + _sessionInstance = session.getClientProperties() == null ? null : session.getClientProperties().getObject(ClientProperties.instance.toAMQShortString()); + } + + public Object getSessionIdentifier() + { + return _sessionIdentifier; + } + + public Object getSessionInstance() + { + return _sessionInstance; + } + } public static interface Task { @@ -175,5 +196,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession public MethodRegistry getMethodRegistry(); public MethodDispatcher getMethodDispatcher(); + + public ProtocolSessionIdentifier getSessionIdentifier(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 1df93dd0d8..05533e0d2d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -33,8 +33,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.exchange.Exchange; - + import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; @@ -54,21 +53,27 @@ public class AMQMessage implements Filterable<AMQException> /** Holds the transactional context in which this message is being processed. */ private StoreContext _storeContext; + /** Flag to indicate that this message requires 'immediate' delivery. */ + + private static final byte IMMEDIATE = 0x01; + /** * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality * for messages published with the 'immediate' flag. */ - private boolean _deliveredToConsumer; - /** Flag to indicate that this message requires 'immediate' delivery. */ - private boolean _immediate; + private static final byte DELIVERED_TO_CONSUMER = 0x02; + + private byte _flags = 0; private long _expiration; - private Object _publisherClientInstance; - private Object _publisherIdentifier; private final long _size; + private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier; + private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); + + /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory @@ -211,7 +216,11 @@ public class AMQMessage implements Filterable<AMQException> { _messageHandle = messageHandle; _storeContext = storeConext; - _immediate = info.isImmediate(); + + if(info.isImmediate()) + { + _flags |= IMMEDIATE; + } _size = messageHandle.getBodySize(storeConext); } @@ -221,7 +230,7 @@ public class AMQMessage implements Filterable<AMQException> { _messageHandle = msg._messageHandle; _storeContext = msg._storeContext; - _deliveredToConsumer = msg._deliveredToConsumer; + _flags = msg._flags; _size = msg._size; } @@ -289,10 +298,7 @@ public class AMQMessage implements Filterable<AMQException> { return true; } - // if (_log.isDebugEnabled()) - // { - // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); - // } + } /** @@ -322,11 +328,6 @@ public class AMQMessage implements Filterable<AMQException> try { - // if (_log.isDebugEnabled()) - // { - // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); - // } - // must check if the handle is null since there may be cases where we decide to throw away a message // and the handle has not yet been constructed if (_messageHandle != null) @@ -359,12 +360,12 @@ public class AMQMessage implements Filterable<AMQException> */ public boolean getDeliveredToConsumer() { - return _deliveredToConsumer; + return (_flags & DELIVERED_TO_CONSUMER) != 0; } public boolean isPersistent() throws AMQException { - return _messageHandle.isPersistent(getStoreContext()); + return _messageHandle.isPersistent(); } /** @@ -376,7 +377,7 @@ public class AMQMessage implements Filterable<AMQException> public boolean immediateAndNotDelivered() { - return (_immediate && !_deliveredToConsumer); + return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; } @@ -428,7 +429,7 @@ public class AMQMessage implements Filterable<AMQException> */ public void setDeliveredToConsumer() { - _deliveredToConsumer = true; + _flags |= DELIVERED_TO_CONSUMER; } @@ -444,26 +445,22 @@ public class AMQMessage implements Filterable<AMQException> } - public void setPublisherClientInstance(final Object publisherClientInstance) - { - _publisherClientInstance = publisherClientInstance; - } - public Object getPublisherClientInstance() { - return _publisherClientInstance; + return _sessionIdentifier.getSessionInstance(); } public Object getPublisherIdentifier() { - return _publisherIdentifier; + return _sessionIdentifier.getSessionIdentifier(); } - public void setPublisherIdentifier(final Object publisherIdentifier) + public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier) { - _publisherIdentifier = publisherIdentifier; + _sessionIdentifier = sessionIdentifier; } + public String toString() { // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index a0db4ba833..0ddd4e4d92 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -67,7 +67,7 @@ public interface AMQMessageHandle void setRedelivered(boolean redelivered); - boolean isPersistent(StoreContext context) throws AMQException; + boolean isPersistent(); void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index 8ef28fbcd2..ba6b392d13 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.subscription.SubscriptionList; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.AMQException; public class AMQPriorityQueue extends SimpleAMQQueue @@ -37,5 +39,29 @@ public class AMQPriorityQueue extends SimpleAMQQueue super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities)); } + @Override + protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) + { + // check that all subscriptions are not in advance of the entry + SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); + while(subIter.advance() && !entry.isAcquired()) + { + final Subscription subscription = subIter.getNode().getSubscription(); + QueueEntry subnode = subscription.getLastSeenEntry(); + while((entry.compareTo(subnode) < 0) && !entry.isAcquired()) + { + if(subscription.setLastSeenEntry(subnode,entry)) + { + break; + } + else + { + subnode = subscription.getLastSeenEntry(); + } + } + + } + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index e7a99ac668..35ad5be4e0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -22,10 +22,10 @@ package org.apache.qpid.server.queue; import java.util.LinkedList; import java.util.List; +import java.util.Collections; +import java.util.ArrayList; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ContentChunk; @@ -40,7 +40,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle private MessagePublishInfo _messagePublishInfo; - private List<ContentChunk> _contentBodies = new LinkedList<ContentChunk>(); + private List<ContentChunk> _contentBodies; private boolean _redelivered; @@ -86,7 +86,22 @@ public class InMemoryMessageHandle implements AMQMessageHandle public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentBody, boolean isLastContentBody) throws AMQException { - _contentBodies.add(contentBody); + if(_contentBodies == null) + { + if(isLastContentBody) + { + _contentBodies = Collections.singletonList(contentBody); + } + else + { + _contentBodies = new ArrayList<ContentChunk>(); + _contentBodies.add(contentBody); + } + } + else + { + _contentBodies.add(contentBody); + } } public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException @@ -105,12 +120,9 @@ public class InMemoryMessageHandle implements AMQMessageHandle _redelivered = redelivered; } - public boolean isPersistent(StoreContext context) throws AMQException + public boolean isPersistent() { - //todo remove literal values to a constant file such as AMQConstants in common - ContentHeaderBody chb = getContentHeaderBody(context); - return chb.properties instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2; + return false; } /** @@ -125,6 +137,10 @@ public class InMemoryMessageHandle implements AMQMessageHandle { _messagePublishInfo = messagePublishInfo; _contentHeaderBody = contentHeaderBody; + if(contentHeaderBody.bodySize == 0) + { + _contentBodies = Collections.EMPTY_LIST; + } _arrivalTime = System.currentTimeMillis(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index dcae821604..68b429efc6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -176,13 +176,10 @@ public class IncomingMessage implements Filterable<RuntimeException> message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo); - message.setPublisherIdentifier(_publisher.getClientIdentifier()); + message.setExpiration(_expiration); + message.setClientIdentifier(_publisher.getSessionIdentifier()); - if (_publisher.getClientProperties() != null) - { - message.setPublisherClientInstance(_publisher.getClientProperties().getObject(ClientProperties.instance.toAMQShortString())); - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a3709641f0..16d24e74ee 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -81,7 +81,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final AtomicBoolean _quiesced = new AtomicBoolean(false); - private final SubscriptionList _subscriptionList = new SubscriptionList(this); + protected final SubscriptionList _subscriptionList = new SubscriptionList(this); private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); private boolean _exclusiveSubscriber; @@ -389,13 +389,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } else { - deliverMessage(sub, entry); QueueEntry queueEntryNode = sub.getLastSeenEntry(); if(_entries.next(queueEntryNode) == entry) { sub.setLastSeenEntry(queueEntryNode,entry); } + deliverMessage(sub, entry); + } } } @@ -414,25 +415,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } else if(!entry.isAcquired()) { - // check that all subscriptions are not in advance of the entry - SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); - while(subIter.advance() && !entry.isAcquired()) - { - final Subscription subscription = subIter.getNode().getSubscription(); - QueueEntry subnode = subscription.getLastSeenEntry(); - while((entry.compareTo(subnode) < 0) && !entry.isAcquired()) - { - if(subscription.setLastSeenEntry(subnode,entry)) - { - break; - } - else - { - subnode = subscription.getLastSeenEntry(); - } - } + checkSubscriptionsNotAheadOfDelivery(entry); - } deliverAsync(); } @@ -451,6 +435,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) + { + // This method is only required for queues which mess with ordering + } + private void incrementQueueSize(final AMQMessage message) { getAtomicQueueSize().addAndGet(message.getSize()); @@ -1204,6 +1193,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { QueueEntry node = sub.getLastSeenEntry(); + while(node.isAcquired() || node.isDeleted() || node.expired()) { if(!node.isAcquired() && !node.isDeleted() && node.expired()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index fb70984d99..3ed8b0e55c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -26,7 +26,6 @@ import java.util.LinkedList; import java.util.List; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ContentChunk; @@ -50,8 +49,6 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle private final Long _messageId; private long _arrivalTime; - private boolean _persistent; - public WeakReferenceMessageHandle(final Long messageId, MessageStore messageStore) { @@ -175,9 +172,9 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _redelivered = redelivered; } - public boolean isPersistent(StoreContext context) throws AMQException + public boolean isPersistent() { - return _persistent; + return true; } /** @@ -203,12 +200,6 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies.size(), arrivalTime); - - - - _persistent = contentHeaderBody.properties instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) contentHeaderBody.properties).getDeliveryMode() == 2; - _messageStore.storeMessageMetaData(storeContext, _messageId, mmd); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index cf986e7803..a1a405c313 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -215,6 +215,11 @@ public class MockProtocolSession implements AMQProtocolSession return null; //To change body of implemented methods use File | Settings | File Templates. } + public ProtocolSessionIdentifier getSessionIdentifier() + { + return null; + } + public byte getProtocolMajorVersion() { return getProtocolVersion().getMajorVersion(); |