From 5cc8d8556f3572c2bf43e5381354e1d8dafc1a51 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 6 Oct 2011 16:28:59 +0000 Subject: QPID-3526: move duplicated methods into the parent abstract message delegate, remove dead code from BassicMessageConsumer Applied patch from Oleksandr Rudyy and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1179694 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/client/BasicMessageConsumer.java | 57 ----------------- .../qpid/client/message/AMQMessageDelegate.java | 6 +- .../client/message/AMQMessageDelegate_0_10.java | 73 ++-------------------- .../client/message/AMQMessageDelegate_0_8.java | 60 +----------------- .../client/message/AbstractAMQMessageDelegate.java | 67 +++++++++++++++++++- 5 files changed, 72 insertions(+), 191 deletions(-) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 78741d6290..0dc8c8c3ed 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -838,63 +838,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa return null; } - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - */ - void acknowledgeDelivered() - { - synchronized(_commitLock) - { - ArrayList tagsToAck = new ArrayList(); - - while (!_receivedDeliveryTags.isEmpty()) - { - tagsToAck.add(_receivedDeliveryTags.poll()); - } - - Collections.sort(tagsToAck); - - long prevAcked = _lastAcked; - long oldAckPoint = -1; - - while(oldAckPoint != prevAcked) - { - oldAckPoint = prevAcked; - - Iterator tagsToAckIterator = tagsToAck.iterator(); - - while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1) - { - tagsToAckIterator.remove(); - prevAcked++; - } - - Iterator previousAckIterator = _previouslyAcked.iterator(); - while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1) - { - previousAckIterator.remove(); - prevAcked++; - } - - } - if(prevAcked != _lastAcked) - { - _session.acknowledgeMessage(prevAcked, true); - _lastAcked = prevAcked; - } - - Iterator tagsToAckIterator = tagsToAck.iterator(); - - while(tagsToAckIterator.hasNext()) - { - Long tag = tagsToAckIterator.next(); - _session.acknowledgeMessage(tag, false); - _previouslyAcked.add(tag); - } - } - } - - void notifyError(Throwable cause) { // synchronized (_closed) diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java index c2821591d8..a9434edf49 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java @@ -26,9 +26,7 @@ import org.apache.qpid.client.AMQSession; import javax.jms.Destination; import javax.jms.JMSException; -import java.nio.ByteBuffer; import java.util.Enumeration; -import java.util.Map; import java.util.UUID; public interface AMQMessageDelegate @@ -130,9 +128,9 @@ public interface AMQMessageDelegate void removeProperty(final String propertyName) throws JMSException; - void setAMQSession(final AMQSession s); + void setAMQSession(final AMQSession s); - AMQSession getAMQSession(); + AMQSession getAMQSession(); long getDeliveryTag(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 6e0316760f..b5a7a0f216 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -37,12 +37,10 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageFormatException; import javax.jms.MessageNotWriteableException; -import javax.jms.Session; import org.apache.qpid.AMQException; import org.apache.qpid.AMQPInvalidClassException; import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; @@ -76,13 +74,8 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate private Destination _destination; - private MessageProperties _messageProps; private DeliveryProperties _deliveryProps; - /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ - private AMQSession _session; - private final long _deliveryTag; - protected AMQMessageDelegate_0_10() { @@ -92,9 +85,9 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag) { + super(deliveryTag); _messageProps = messageProps; _deliveryProps = deliveryProps; - _deliveryTag = deliveryTag; _readableProperties = (_messageProps != null); AMQDestination dest; @@ -325,14 +318,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { try { - int type = ((AMQSession_0_10)_session).resolveAddressType(amqd); + int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd); if (type == AMQDestination.QUEUE_TYPE) { - ((AMQSession_0_10)_session).setLegacyFiledsForQueueType(amqd); + ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd); } else { - ((AMQSession_0_10)_session).setLegacyFiledsForTopicType(amqd); + ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd); } } catch(AMQException ex) @@ -905,64 +898,6 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate _readableProperties = false; } - - public void acknowledgeThis() throws JMSException - { - // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge - // is not specified. In our case, we only set the session field where client acknowledge mode is specified. - if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - if (_session.getAMQConnection().isClosed()) - { - throw new javax.jms.IllegalStateException("Connection is already closed"); - } - - // we set multiple to true here since acknowledgment implies acknowledge of all previous messages - // received on the session - _session.acknowledgeMessage(_deliveryTag, true); - } - } - - public void acknowledge() throws JMSException - { - if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - _session.acknowledge(); - } - } - - - /** - * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls - * acknowledge() - * - * @param s the AMQ session that delivered this message - */ - public void setAMQSession(AMQSession s) - { - _session = s; - } - - public AMQSession getAMQSession() - { - return _session; - } - - /** - * Get the AMQ message number assigned to this message - * - * @return the message number - */ - public long getDeliveryTag() - { - return _deliveryTag; - } - - - - - - protected void checkPropertyName(CharSequence propertyName) { if (propertyName == null) diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index b9ba946a20..9ab03412fe 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -30,7 +30,6 @@ import java.util.UUID; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageNotWriteableException; -import javax.jms.Session; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -60,15 +59,12 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); private ContentHeaderProperties _contentHeaderProperties; - /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ - private AMQSession _session; - private final long _deliveryTag; // The base set of items that needs to be set. private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag) { + super(deliveryTag); _contentHeaderProperties = properties; - _deliveryTag = deliveryTag; _readableProperties = (_contentHeaderProperties != null); _headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders() : (new BasicContentHeaderProperties()).getHeaders() ); @@ -518,58 +514,4 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate _readableProperties = false; } - - - public void acknowledgeThis() throws JMSException - { - // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge - // is not specified. In our case, we only set the session field where client acknowledge mode is specified. - if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - if (_session.getAMQConnection().isClosed()) - { - throw new javax.jms.IllegalStateException("Connection is already closed"); - } - - // we set multiple to true here since acknowledgement implies acknowledge of all previous messages - // received on the session - _session.acknowledgeMessage(_deliveryTag, true); - } - } - - public void acknowledge() throws JMSException - { - if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - _session.acknowledge(); - } - } - - - /** - * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls - * acknowledge() - * - * @param s the AMQ session that delivered this message - */ - public void setAMQSession(AMQSession s) - { - _session = s; - } - - public AMQSession getAMQSession() - { - return _session; - } - - /** - * Get the AMQ message number assigned to this message - * - * @return the message number - */ - public long getDeliveryTag() - { - return _deliveryTag; - } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java index 89fbc9722c..1b6c0c751d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java @@ -23,9 +23,13 @@ package org.apache.qpid.client.message; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.jms.JMSException; +import javax.jms.Session; + import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -78,7 +82,25 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate new ExchangeInfo(ExchangeDefaults.HEADERS_EXCHANGE_NAME.toString(), ExchangeDefaults.HEADERS_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE)); - + } + + /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ + private AMQSession _session; + private final long _deliveryTag; + + protected AbstractAMQMessageDelegate(long deliveryTag) + { + _deliveryTag = deliveryTag; + } + + /** + * Get the AMQ message number assigned to this message + * + * @return the message number + */ + public long getDeliveryTag() + { + return _deliveryTag; } /** @@ -157,6 +179,47 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate { return _exchangeMap.containsKey(exchange); } + + public void acknowledgeThis() throws JMSException + { + // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge + // is not specified. In our case, we only set the session field where client acknowledge mode is specified. + if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + if (_session.getAMQConnection().isClosed()) + { + throw new javax.jms.IllegalStateException("Connection is already closed"); + } + + // we set multiple to true here since acknowledgement implies acknowledge of all previous messages + // received on the session + _session.acknowledgeMessage(getDeliveryTag(), true); + } + } + + public void acknowledge() throws JMSException + { + if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + _session.acknowledge(); + } + } + + /** + * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls + * acknowledge() + * + * @param s the AMQ session that delivered this message + */ + public void setAMQSession(AMQSession s) + { + _session = s; + } + + public AMQSession getAMQSession() + { + return _session; + } } class ExchangeInfo @@ -202,5 +265,5 @@ class ExchangeInfo public void setDestType(int destType) { this.destType = destType; - } + } } -- cgit v1.2.1