diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-10 08:31:42 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-10 08:31:42 +0000 |
commit | f0675254ce88655bd97c7e8e4c214754b5dcd674 (patch) | |
tree | 6a221fe8d39a4f057df901f6df7694dec3566e7a | |
parent | 5056e239f4fd0720a99610278c97c12f1d8b56dc (diff) | |
download | qpid-python-f0675254ce88655bd97c7e8e4c214754b5dcd674.tar.gz |
QPID-273 : (Patch supplied by Rob Godfrey) Remove unnecessary Map creation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@494765 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java | 272 | ||||
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java | 88 |
2 files changed, 174 insertions, 186 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index 934bca991d..ba7000f822 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -26,6 +26,7 @@ import java.util.HashMap; import javax.jms.DeliveryMode; import javax.jms.JMSException; +import javax.jms.Message; //import org.apache.activemq.command.ActiveMQDestination; //import org.apache.activemq.command.Message; @@ -45,225 +46,232 @@ import org.apache.log4j.Logger; public class PropertyExpression implements Expression { - interface SubExpression - { - public Object evaluate(AMQMessage message) throws AMQException; - } - - interface JMSExpression - { - public abstract Object evaluate(JMSMessage message); - } - - static class SubJMSExpression implements SubExpression - { - JMSExpression _expression; - - SubJMSExpression(JMSExpression expression) - { - _expression = expression; - } - public Object evaluate(AMQMessage message) throws AMQException - { - JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE); - if (msg != null) - { - return _expression.evaluate(msg); - } - else - { - return null; - } - } - } - private final static Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class); - static final private HashMap JMS_PROPERTY_EXPRESSIONS = new HashMap(); + static final private HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>(); static { - JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new SubJMSExpression( - new JMSExpression() + JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", + new Expression() { - public Object evaluate(JMSMessage message) + public Object evaluate(AMQMessage message) { - return message.getJMSDestination(); + //TODO + return null; } } - )); -// -// public Object evaluate(AMQMessage message) -// { -// //fixme -// -// -//// AMQDestination dest = message.getOriginalDestination(); -//// if (dest == null) -//// { -//// dest = message.getDestination(); -//// } -//// if (dest == null) -//// { -//// return null; -//// } -//// return dest.toString(); -// return ""; -// } -// }); - JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new SubJMSExpression( - new JMSExpression() - { - public Object evaluate(JMSMessage message) + ); + JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new Expression() + { + public Object evaluate(AMQMessage message) { - return message.getJMSReplyTo(); + try + { + BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + return _properties.getReplyTo(); + } + catch (AMQException e) + { + _logger.warn(e); + return null; + } + } - }) - ); - JMS_PROPERTY_EXPRESSIONS.put("JMSType", new SubJMSExpression( - new JMSExpression() + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSType", + new Expression() { - public Object evaluate(JMSMessage message) + public Object evaluate(AMQMessage message) { - return message.getJMSType(); + try + { + BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + return _properties.getType(); + } + catch (AMQException e) + { + _logger.warn(e); + return null; + } + } } - )); + ); - JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new SubJMSExpression( - new JMSExpression() + JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", + new Expression() { - public Object evaluate(JMSMessage message) + public Object evaluate(AMQMessage message) { try { - Integer mode = new Integer(message.getAMQMessage().isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - _logger.info("JMSDeliveryMode is :" + mode); + int mode = message.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; + if(_logger.isDebugEnabled()) + { + _logger.debug("JMSDeliveryMode is :" + mode); + } return mode; } catch (AMQException e) { - //shouldn't happen + _logger.warn(e); } return DeliveryMode.NON_PERSISTENT; } - })); + }); - JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubJMSExpression( - new JMSExpression() + JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", + new Expression() { - public Object evaluate(JMSMessage message) + public Object evaluate(AMQMessage message) { - return message.getJMSPriority(); + try + { + BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + return (int) _properties.getPriority(); + } + catch (AMQException e) + { + _logger.warn(e); + } + return Message.DEFAULT_PRIORITY; } } - )); + ); - JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new SubJMSExpression( - new JMSExpression() + JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", + new Expression() { - public Object evaluate(JMSMessage message) + public Object evaluate(AMQMessage message) { - return message.getAMQMessage().getMessageId(); + + try + { + BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + return _properties.getMessageId(); + } + catch (AMQException e) + { + _logger.warn(e); + return null; + } + } } - )); + ); - JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubJMSExpression( - new JMSExpression() + JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", + new Expression() { - public Object evaluate(JMSMessage message) + public Object evaluate(AMQMessage message) { - return message.getJMSTimestamp(); + + try + { + BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + return _properties.getTimestamp(); + } + catch (AMQException e) + { + _logger.warn(e); + return null; + } + } } - )); + ); - JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new SubJMSExpression( - new JMSExpression() + JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", + new Expression() { - public Object evaluate(JMSMessage message) + public Object evaluate(AMQMessage message) { - return message.getJMSCorrelationID(); + + try + { + BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + return _properties.getCorrelationId(); + } + catch (AMQException e) + { + _logger.warn(e); + return null; + } + } } - )); + ); - JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new SubJMSExpression( - new JMSExpression() + JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", + new Expression() { - public Object evaluate(JMSMessage message) + public Object evaluate(AMQMessage message) { - return message.getJMSExpiration(); + + try + { + BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + return _properties.getExpiration(); + } + catch (AMQException e) + { + _logger.warn(e); + return null; + } + } - } - )); + }); - JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new SubJMSExpression( - new JMSExpression() + JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", + new Expression() { - public Object evaluate(JMSMessage message) + public Object evaluate(AMQMessage message) { - return message.getAMQMessage().isRedelivered(); + return message.isRedelivered(); } } - )); + ); } private final String name; - private final SubExpression jmsPropertyExpression; + private final Expression jmsPropertyExpression; public PropertyExpression(String name) { this.name = name; - jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name); + jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name); } public Object evaluate(AMQMessage message) throws AMQException { -// try -// { -// if (message.isDropped()) -// { -// return null; -// } if (jmsPropertyExpression != null) { return jmsPropertyExpression.evaluate(message); } -// try + else { BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; - _logger.info("Looking up property:" + name); - _logger.info("Properties are:" + _properties.getHeaders().keySet()); + if(_logger.isDebugEnabled()) + { + _logger.debug("Looking up property:" + name); + _logger.debug("Properties are:" + _properties.getHeaders().keySet()); + } return _properties.getHeaders().getObject(name); } -// catch (IOException ioe) -// { -// JMSException exception = new JMSException("Could not get property: " + name + " reason: " + ioe.getMessage()); -// exception.initCause(ioe); -// throw exception; -// } -// } -// catch (IOException e) -// { -// JMSException exception = new JMSException(e.getMessage()); -// exception.initCause(e); -// throw exception; -// } - } public String getName() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 05b4f5ec2b..b56523f904 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -48,7 +48,7 @@ public class AMQMessage /** * Used in clustering */ - private final Set<Object> _tokens = new HashSet<Object>(); + private Set<Object> _tokens; /** * Only use in clustering - should ideally be removed? @@ -71,7 +71,6 @@ public class AMQMessage */ private boolean _deliveredToConsumer; - private ConcurrentHashMap<String, MessageDecorator> _decodedMessages; private AtomicBoolean _taken = new AtomicBoolean(false); private TransientMessageData _transientMessageData = new TransientMessageData(); @@ -167,7 +166,7 @@ public class AMQMessage _messageId = messageId; _txnContext = txnContext; _transientMessageData.setPublishBody(publishBody); - _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>(); + _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) { @@ -396,39 +395,6 @@ public class AMQMessage } - public MessageDecorator getDecodedMessage(String type) throws AMQException - { - MessageDecorator msgtype = null; - - if (_decodedMessages != null) - { - msgtype = _decodedMessages.get(type); - - if (msgtype == null) - { - msgtype = decorateMessage(type); - } - } - - return msgtype; - } - - private MessageDecorator decorateMessage(String type) throws AMQException - { - MessageDecorator msgdec = null; - - if (type.equals(JMS_MESSAGE)) - { - msgdec = new JMSMessage(this); - } - - if (msgdec != null) - { - _decodedMessages.put(type, msgdec); - } - - return msgdec; - } public boolean taken() { @@ -442,6 +408,12 @@ public class AMQMessage public boolean checkToken(Object token) { + + if(_tokens==null) + { + _tokens = new HashSet<Object>(); + } + if (_tokens.contains(token)) { return true; @@ -569,33 +541,41 @@ public class AMQMessage AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); - Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId); - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - if (bodyFrameIterator.hasNext()) + final int bodyCount = _messageHandle.getBodyCount(_messageId); + if(bodyCount == 0) { - AMQDataBlock firstContentBody = bodyFrameIterator.next(); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); protocolSession.writeFrame(compositeBlock); } else { - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, - new AMQDataBlock[]{contentHeader}); + + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentBody cb = _messageHandle.getContentBody(_messageId, 0); + + AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); protocolSession.writeFrame(compositeBlock); - } - // - // Now start writing out the other content bodies - // - while (bodyFrameIterator.hasNext()) - { - protocolSession.writeFrame(bodyFrameIterator.next()); + // + // Now start writing out the other content bodies + // + for(int i = 1; i < bodyCount; i++) + { + cb = _messageHandle.getContentBody(_messageId, i); + protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb)); + } + + } + } private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, AMQShortString consumerTag) |