summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-04-16 11:43:37 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-04-16 11:43:37 +0000
commit1fdfb841a9787d0f5bacee5489a963aaf522c332 (patch)
treef18b20a6617d78df4bd98f7b26b259ad5ae96117
parent48a474cba1f1ecd98a60810c4f02b6bda1e27172 (diff)
downloadqpid-python-1fdfb841a9787d0f5bacee5489a963aaf522c332.tar.gz
QPID-933 : performance tweaks
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@648672 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java52
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBody.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java17
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java58
12 files changed, 145 insertions, 82 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
index b69a917081..5b0f3cf5eb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
@@ -47,7 +47,7 @@ public interface UnacknowledgedMessageMap
void add(long deliveryTag, UnacknowledgedMessage message);
- void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
+ void collect(Long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
boolean contains(long deliveryTag) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 20ee646a40..5204f13e81 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -28,9 +28,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.txn.TransactionalContext;
public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -51,13 +48,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
_map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit);
}
- /*public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map)
- {
- _lock = lock;
- _map = map;
- } */
-
- public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+ public void collect(Long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
{
if (multiple)
{
@@ -213,14 +204,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- private void collect(long key, List<UnacknowledgedMessage> msgs)
+ private void collect(Long key, List<UnacknowledgedMessage> msgs)
{
synchronized (_lock)
{
for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
{
msgs.add(entry.getValue());
- if (entry.getKey() == key)
+ if (entry.getKey().equals(key))
{
break;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index 4bc53dfe03..48d2ca9bc9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -179,26 +179,31 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
+
+
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
- final boolean isRedelivered = messageHandle.isRedelivered();
- final AMQShortString exchangeName = pb.getExchange();
- final AMQShortString routingKey = pb.getRoutingKey();
-
final AMQBody returnBlock = new AMQBody()
{
+
+
+ private final boolean _isRedelivered = messageHandle.isRedelivered();
+ private final AMQShortString _exchangeName = pb.getExchange();
+ private final AMQShortString _routingKey = pb.getRoutingKey();
+
+
public AMQBody _underlyingBody;
public AMQBody createAMQBody()
{
return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
+ _isRedelivered,
+ _exchangeName,
+ _routingKey);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index d8dbf97e49..ad1c507c04 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -265,10 +265,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
*/
public void messageSent(IoSession protocolSession, Object object) throws Exception
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Message sent: " + object);
- }
}
protected boolean isSSLClient(ConnectorConfiguration connectionConfig,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
index 630186991b..0b40f01f1a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import java.util.LinkedList;
import java.util.List;
+import java.util.ArrayList;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -40,7 +41,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle
private MessagePublishInfo _messagePublishInfo;
- private List<ContentChunk> _contentBodies = new LinkedList<ContentChunk>();
+ private List<ContentChunk> _contentBodies = new ArrayList<ContentChunk>();
private boolean _redelivered;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index bde3ad8ec9..05cd461582 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -254,13 +254,6 @@ public class SubscriptionImpl implements Subscription
{
long deliveryTag = channel.getNextDeliveryTag();
- // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
- // received the message. If it is lost in transit that is not important.
-// if (_acks)
-// {
-// channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
-// }
-
if (_sendLock.get())
{
_logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
@@ -283,25 +276,23 @@ public class SubscriptionImpl implements Subscription
// The send may of course still fail, in which case, as
// the message is unacked, it will be lost.
+ final AMQMessage message = entry.getMessage();
+
if (!_acks)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
+ _logger.debug("No ack mode so dequeuing message immediately: " + message.getMessageId());
}
queue.dequeue(storeContext, entry);
}
-/*
- if (_sendLock.get())
- {
- _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
- }
-*/
+ final ProtocolOutputConverter outputConverter = protocolSession.getProtocolOutputConverter();
+ final int channelId = channel.getChannelId();
synchronized (channel)
{
- long deliveryTag = channel.getNextDeliveryTag();
+ final long deliveryTag = channel.getNextDeliveryTag();
if (_acks)
@@ -309,13 +300,13 @@ public class SubscriptionImpl implements Subscription
channel.addUnacknowledgedMessage(entry, deliveryTag, consumerTag);
}
- protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
+ outputConverter.writeDeliver(message, channelId, deliveryTag, consumerTag);
}
if (!_acks)
{
- entry.getMessage().decrementReference(storeContext);
+ message.decrementReference(storeContext);
}
}
finally
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
index 9b784069dd..dee1676632 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
@@ -28,14 +28,18 @@ import org.apache.qpid.server.security.access.AccessResult;
import org.apache.qpid.server.security.access.Accessable;
import org.apache.qpid.server.security.access.Permission;
import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
public class AllowAll implements ACLPlugin
{
+
+ private static final Logger _logger = ACLManager.getLogger();
+
public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters)
{
- if (ACLManager.getLogger().isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- ACLManager.getLogger().debug("Allowing user:" + session.getAuthorizedID() + " for :" + permission.toString()
+ _logger.debug("Allowing user:" + session.getAuthorizedID() + " for :" + permission.toString()
+ " on " + body.getClass().getSimpleName()
+ (parameters == null || parameters.length == 0 ? "" : "-" + accessablesToString(parameters)));
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d17572ad77..eb776ba786 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -182,6 +182,46 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_fastAccessConsumers[i] = null;
}
}
+
+
+ public void acknowledgeDelivered()
+ {
+
+ for(int i = 0; i<16; i++)
+ {
+ final BasicMessageConsumer c = _fastAccessConsumers[i];
+ if(c != null)
+ {
+ c.acknowledgeDelivered();
+ }
+ }
+ if(!_slowAccessConsumers.isEmpty())
+ {
+ for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();)
+ {
+ i.next().acknowledgeDelivered();
+ }
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ for(int i = 0; i<16; i++)
+ {
+ final BasicMessageConsumer c = _fastAccessConsumers[i];
+ if(c != null)
+ {
+ c.acknowledge();
+ }
+ }
+ if(!_slowAccessConsumers.isEmpty())
+ {
+ for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();)
+ {
+ i.next().acknowledge();
+ }
+ }
+ }
}
@@ -500,10 +540,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throw new IllegalStateException("Session is already closed");
}
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- consumer.acknowledge();
- }
+ _consumers.acknowledge();
}
/**
@@ -725,12 +762,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// We only need to find the highest value and ack that as commit is session level.
Long lastTag = -1L;
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- {
- i.next().acknowledgeDelivered();
- }
+ _consumers.acknowledgeDelivered();
- if (_transacted)
+ if (_transacted && !_removedConsumers.isEmpty())
{
// Do the above, but for consumers which have been de-registered since the
// last commit
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 87df7e1337..4de6a1d410 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -71,8 +71,17 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
while (it.hasNext())
{
ContentBody cb = (ContentBody) it.next();
- data.put(cb.payload);
- cb.payload.release();
+ final ByteBuffer payload = cb.payload;
+ if(payload.isDirect() || payload.isReadOnly())
+ {
+ data.put(payload);
+ }
+ else
+ {
+ data.put(payload.array(), payload.arrayOffset(), payload.limit());
+ }
+
+ payload.release();
}
data.flip();
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index 969df954ce..9d39f8aa86 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -65,8 +65,15 @@ public class ContentBody implements AMQBody
{
if (payload != null)
{
- ByteBuffer copy = payload.duplicate();
- buffer.put(copy.rewind());
+ if(payload.isDirect() || payload.isReadOnly())
+ {
+ ByteBuffer copy = payload.duplicate();
+ buffer.put(copy.rewind());
+ }
+ else
+ {
+ buffer.put(payload.array(),payload.arrayOffset(),payload.limit());
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index ee6762181d..9ba9b53b13 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -957,16 +957,21 @@ public class FieldTable
if (_encodedForm != null)
{
+ if(buffer.isDirect() || buffer.isReadOnly())
+ {
+ ByteBuffer encodedForm = _encodedForm.duplicate();
- ByteBuffer encodedForm = _encodedForm.duplicate();
+ if (encodedForm.position() != 0)
+ {
+ encodedForm.flip();
+ }
- if (encodedForm.position() != 0)
+ buffer.put(encodedForm);
+ }
+ else
{
- encodedForm.flip();
+ buffer.put(_encodedForm.array(),_encodedForm.arrayOffset(),(int)_encodedSize);
}
- // _encodedForm.limit((int)getEncodedSize());
-
- buffer.put(encodedForm);
}
else if (_properties != null)
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
index 2049797619..2fd4f70138 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
@@ -45,31 +45,21 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot
public AMQBody convertToBody(ContentChunk contentChunk)
{
- return new ContentBody(contentChunk.getData());
+ if(contentChunk instanceof ContentChunk_0_9)
+ {
+ return ((ContentChunk_0_9)contentChunk).toBody();
+ }
+ else
+ {
+ return new ContentBody(contentChunk.getData());
+ }
}
public ContentChunk convertToContentChunk(AMQBody body)
{
final ContentBody contentBodyChunk = (ContentBody) body;
- return new ContentChunk()
- {
-
- public int getSize()
- {
- return contentBodyChunk.getSize();
- }
-
- public ByteBuffer getData()
- {
- return contentBodyChunk.payload;
- }
-
- public void reduceToFit()
- {
- contentBodyChunk.reduceBufferToFit();
- }
- };
+ return new ContentChunk_0_9(contentBodyChunk);
}
@@ -149,4 +139,34 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot
return _routingKey;
}
}
+
+ private static class ContentChunk_0_9 implements ContentChunk
+ {
+ private final ContentBody _contentBodyChunk;
+
+ public ContentChunk_0_9(final ContentBody contentBodyChunk)
+ {
+ _contentBodyChunk = contentBodyChunk;
+ }
+
+ public int getSize()
+ {
+ return _contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return _contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ _contentBodyChunk.reduceBufferToFit();
+ }
+
+ public AMQBody toBody()
+ {
+ return _contentBodyChunk;
+ }
+ }
}