diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-04-16 11:43:37 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-04-16 11:43:37 +0000 |
commit | 1fdfb841a9787d0f5bacee5489a963aaf522c332 (patch) | |
tree | f18b20a6617d78df4bd98f7b26b259ad5ae96117 | |
parent | 48a474cba1f1ecd98a60810c4f02b6bda1e27172 (diff) | |
download | qpid-python-1fdfb841a9787d0f5bacee5489a963aaf522c332.tar.gz |
QPID-933 : performance tweaks
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@648672 13f79535-47bb-0310-9956-ffa450edef68
12 files changed, 145 insertions, 82 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java index b69a917081..5b0f3cf5eb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java @@ -47,7 +47,7 @@ public interface UnacknowledgedMessageMap void add(long deliveryTag, UnacknowledgedMessage message); - void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs); + void collect(Long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs); boolean contains(long deliveryTag) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 20ee646a40..5204f13e81 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -28,9 +28,6 @@ import java.util.Map; import java.util.Set; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.txn.TransactionalContext; public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap @@ -51,13 +48,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap _map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit); } - /*public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map) - { - _lock = lock; - _map = map; - } */ - - public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs) + public void collect(Long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs) { if (multiple) { @@ -213,14 +204,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - private void collect(long key, List<UnacknowledgedMessage> msgs) + private void collect(Long key, List<UnacknowledgedMessage> msgs) { synchronized (_lock) { for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) { msgs.add(entry.getValue()); - if (entry.getKey() == key) + if (entry.getKey().equals(key)) { break; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 4bc53dfe03..48d2ca9bc9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -179,26 +179,31 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
+
+
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
- final boolean isRedelivered = messageHandle.isRedelivered();
- final AMQShortString exchangeName = pb.getExchange();
- final AMQShortString routingKey = pb.getRoutingKey();
-
final AMQBody returnBlock = new AMQBody()
{
+
+
+ private final boolean _isRedelivered = messageHandle.isRedelivered();
+ private final AMQShortString _exchangeName = pb.getExchange();
+ private final AMQShortString _routingKey = pb.getRoutingKey();
+
+
public AMQBody _underlyingBody;
public AMQBody createAMQBody()
{
return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
+ _isRedelivered,
+ _exchangeName,
+ _routingKey);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index d8dbf97e49..ad1c507c04 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -265,10 +265,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter */ public void messageSent(IoSession protocolSession, Object object) throws Exception { - if (_logger.isDebugEnabled()) - { - _logger.debug("Message sent: " + object); - } } protected boolean isSSLClient(ConnectorConfiguration connectionConfig, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index 630186991b..0b40f01f1a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import java.util.LinkedList; import java.util.List; +import java.util.ArrayList; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -40,7 +41,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle private MessagePublishInfo _messagePublishInfo; - private List<ContentChunk> _contentBodies = new LinkedList<ContentChunk>(); + private List<ContentChunk> _contentBodies = new ArrayList<ContentChunk>(); private boolean _redelivered; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index bde3ad8ec9..05cd461582 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -254,13 +254,6 @@ public class SubscriptionImpl implements Subscription { long deliveryTag = channel.getNextDeliveryTag(); - // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client - // received the message. If it is lost in transit that is not important. -// if (_acks) -// { -// channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); -// } - if (_sendLock.get()) { _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); @@ -283,25 +276,23 @@ public class SubscriptionImpl implements Subscription // The send may of course still fail, in which case, as // the message is unacked, it will be lost. + final AMQMessage message = entry.getMessage(); + if (!_acks) { if (_logger.isDebugEnabled()) { - _logger.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId()); + _logger.debug("No ack mode so dequeuing message immediately: " + message.getMessageId()); } queue.dequeue(storeContext, entry); } -/* - if (_sendLock.get()) - { - _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!"); - } -*/ + final ProtocolOutputConverter outputConverter = protocolSession.getProtocolOutputConverter(); + final int channelId = channel.getChannelId(); synchronized (channel) { - long deliveryTag = channel.getNextDeliveryTag(); + final long deliveryTag = channel.getNextDeliveryTag(); if (_acks) @@ -309,13 +300,13 @@ public class SubscriptionImpl implements Subscription channel.addUnacknowledgedMessage(entry, deliveryTag, consumerTag); } - protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag); + outputConverter.writeDeliver(message, channelId, deliveryTag, consumerTag); } if (!_acks) { - entry.getMessage().decrementReference(storeContext); + message.decrementReference(storeContext); } } finally diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java index 9b784069dd..dee1676632 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java @@ -28,14 +28,18 @@ import org.apache.qpid.server.security.access.AccessResult; import org.apache.qpid.server.security.access.Accessable; import org.apache.qpid.server.security.access.Permission; import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; public class AllowAll implements ACLPlugin { + + private static final Logger _logger = ACLManager.getLogger(); + public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) { - if (ACLManager.getLogger().isDebugEnabled()) + if (_logger.isDebugEnabled()) { - ACLManager.getLogger().debug("Allowing user:" + session.getAuthorizedID() + " for :" + permission.toString() + _logger.debug("Allowing user:" + session.getAuthorizedID() + " for :" + permission.toString() + " on " + body.getClass().getSimpleName() + (parameters == null || parameters.length == 0 ? "" : "-" + accessablesToString(parameters))); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d17572ad77..eb776ba786 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -182,6 +182,46 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _fastAccessConsumers[i] = null; } } + + + public void acknowledgeDelivered() + { + + for(int i = 0; i<16; i++) + { + final BasicMessageConsumer c = _fastAccessConsumers[i]; + if(c != null) + { + c.acknowledgeDelivered(); + } + } + if(!_slowAccessConsumers.isEmpty()) + { + for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();) + { + i.next().acknowledgeDelivered(); + } + } + } + + public void acknowledge() throws JMSException + { + for(int i = 0; i<16; i++) + { + final BasicMessageConsumer c = _fastAccessConsumers[i]; + if(c != null) + { + c.acknowledge(); + } + } + if(!_slowAccessConsumers.isEmpty()) + { + for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();) + { + i.next().acknowledge(); + } + } + } } @@ -500,10 +540,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throw new IllegalStateException("Session is already closed"); } - for (BasicMessageConsumer consumer : _consumers.values()) - { - consumer.acknowledge(); - } + _consumers.acknowledge(); } /** @@ -725,12 +762,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // We only need to find the highest value and ack that as commit is session level. Long lastTag = -1L; - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - i.next().acknowledgeDelivered(); - } + _consumers.acknowledgeDelivered(); - if (_transacted) + if (_transacted && !_removedConsumers.isEmpty()) { // Do the above, but for consumers which have been de-registered since the // last commit diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 87df7e1337..4de6a1d410 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -71,8 +71,17 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory while (it.hasNext()) { ContentBody cb = (ContentBody) it.next(); - data.put(cb.payload); - cb.payload.release(); + final ByteBuffer payload = cb.payload; + if(payload.isDirect() || payload.isReadOnly()) + { + data.put(payload); + } + else + { + data.put(payload.array(), payload.arrayOffset(), payload.limit()); + } + + payload.release(); } data.flip(); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index 969df954ce..9d39f8aa86 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -65,8 +65,15 @@ public class ContentBody implements AMQBody { if (payload != null) { - ByteBuffer copy = payload.duplicate(); - buffer.put(copy.rewind()); + if(payload.isDirect() || payload.isReadOnly()) + { + ByteBuffer copy = payload.duplicate(); + buffer.put(copy.rewind()); + } + else + { + buffer.put(payload.array(),payload.arrayOffset(),payload.limit()); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index ee6762181d..9ba9b53b13 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -957,16 +957,21 @@ public class FieldTable if (_encodedForm != null) { + if(buffer.isDirect() || buffer.isReadOnly()) + { + ByteBuffer encodedForm = _encodedForm.duplicate(); - ByteBuffer encodedForm = _encodedForm.duplicate(); + if (encodedForm.position() != 0) + { + encodedForm.flip(); + } - if (encodedForm.position() != 0) + buffer.put(encodedForm); + } + else { - encodedForm.flip(); + buffer.put(_encodedForm.array(),_encodedForm.arrayOffset(),(int)_encodedSize); } - // _encodedForm.limit((int)getEncodedSize()); - - buffer.put(encodedForm); } else if (_properties != null) { diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java index 2049797619..2fd4f70138 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -45,31 +45,21 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot public AMQBody convertToBody(ContentChunk contentChunk)
{
- return new ContentBody(contentChunk.getData());
+ if(contentChunk instanceof ContentChunk_0_9)
+ {
+ return ((ContentChunk_0_9)contentChunk).toBody();
+ }
+ else
+ {
+ return new ContentBody(contentChunk.getData());
+ }
}
public ContentChunk convertToContentChunk(AMQBody body)
{
final ContentBody contentBodyChunk = (ContentBody) body;
- return new ContentChunk()
- {
-
- public int getSize()
- {
- return contentBodyChunk.getSize();
- }
-
- public ByteBuffer getData()
- {
- return contentBodyChunk.payload;
- }
-
- public void reduceToFit()
- {
- contentBodyChunk.reduceBufferToFit();
- }
- };
+ return new ContentChunk_0_9(contentBodyChunk);
}
@@ -149,4 +139,34 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot return _routingKey;
}
}
+
+ private static class ContentChunk_0_9 implements ContentChunk
+ {
+ private final ContentBody _contentBodyChunk;
+
+ public ContentChunk_0_9(final ContentBody contentBodyChunk)
+ {
+ _contentBodyChunk = contentBodyChunk;
+ }
+
+ public int getSize()
+ {
+ return _contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return _contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ _contentBodyChunk.reduceBufferToFit();
+ }
+
+ public AMQBody toBody()
+ {
+ return _contentBodyChunk;
+ }
+ }
}
|