summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-10-06 16:28:59 +0000
committerRobert Gemmell <robbie@apache.org>2011-10-06 16:28:59 +0000
commit5cc8d8556f3572c2bf43e5381354e1d8dafc1a51 (patch)
tree4848daae1211d803a9c008fe3f6a91345e629d30
parent072d9bfb6a1deda0f390636fc11ba5b366700105 (diff)
downloadqpid-python-5cc8d8556f3572c2bf43e5381354e1d8dafc1a51.tar.gz
QPID-3526: move duplicated methods into the parent abstract message delegate, remove dead code from BassicMessageConsumer
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1179694 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java57
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java73
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java60
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java67
5 files changed, 72 insertions, 191 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 78741d6290..0dc8c8c3ed 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -838,63 +838,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return null;
}
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- */
- void acknowledgeDelivered()
- {
- synchronized(_commitLock)
- {
- ArrayList<Long> tagsToAck = new ArrayList<Long>();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- tagsToAck.add(_receivedDeliveryTags.poll());
- }
-
- Collections.sort(tagsToAck);
-
- long prevAcked = _lastAcked;
- long oldAckPoint = -1;
-
- while(oldAckPoint != prevAcked)
- {
- oldAckPoint = prevAcked;
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
- {
- tagsToAckIterator.remove();
- prevAcked++;
- }
-
- Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
- while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
- {
- previousAckIterator.remove();
- prevAcked++;
- }
-
- }
- if(prevAcked != _lastAcked)
- {
- _session.acknowledgeMessage(prevAcked, true);
- _lastAcked = prevAcked;
- }
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext())
- {
- Long tag = tagsToAckIterator.next();
- _session.acknowledgeMessage(tag, false);
- _previouslyAcked.add(tag);
- }
- }
- }
-
-
void notifyError(Throwable cause)
{
// synchronized (_closed)
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
index c2821591d8..a9434edf49 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
@@ -26,9 +26,7 @@ import org.apache.qpid.client.AMQSession;
import javax.jms.Destination;
import javax.jms.JMSException;
-import java.nio.ByteBuffer;
import java.util.Enumeration;
-import java.util.Map;
import java.util.UUID;
public interface AMQMessageDelegate
@@ -130,9 +128,9 @@ public interface AMQMessageDelegate
void removeProperty(final String propertyName) throws JMSException;
- void setAMQSession(final AMQSession s);
+ void setAMQSession(final AMQSession<?,?> s);
- AMQSession getAMQSession();
+ AMQSession<?,?> getAMQSession();
long getDeliveryTag();
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 6e0316760f..b5a7a0f216 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -37,12 +37,10 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
-import javax.jms.Session;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
@@ -76,13 +74,8 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
private Destination _destination;
-
private MessageProperties _messageProps;
private DeliveryProperties _deliveryProps;
- /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- private AMQSession _session;
- private final long _deliveryTag;
-
protected AMQMessageDelegate_0_10()
{
@@ -92,9 +85,9 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
{
+ super(deliveryTag);
_messageProps = messageProps;
_deliveryProps = deliveryProps;
- _deliveryTag = deliveryTag;
_readableProperties = (_messageProps != null);
AMQDestination dest;
@@ -325,14 +318,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
try
{
- int type = ((AMQSession_0_10)_session).resolveAddressType(amqd);
+ int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
if (type == AMQDestination.QUEUE_TYPE)
{
- ((AMQSession_0_10)_session).setLegacyFiledsForQueueType(amqd);
+ ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd);
}
else
{
- ((AMQSession_0_10)_session).setLegacyFiledsForTopicType(amqd);
+ ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd);
}
}
catch(AMQException ex)
@@ -905,64 +898,6 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
_readableProperties = false;
}
-
- public void acknowledgeThis() throws JMSException
- {
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- if (_session.getAMQConnection().isClosed())
- {
- throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
- // we set multiple to true here since acknowledgment implies acknowledge of all previous messages
- // received on the session
- _session.acknowledgeMessage(_deliveryTag, true);
- }
- }
-
- public void acknowledge() throws JMSException
- {
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.acknowledge();
- }
- }
-
-
- /**
- * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
- * acknowledge()
- *
- * @param s the AMQ session that delivered this message
- */
- public void setAMQSession(AMQSession s)
- {
- _session = s;
- }
-
- public AMQSession getAMQSession()
- {
- return _session;
- }
-
- /**
- * Get the AMQ message number assigned to this message
- *
- * @return the message number
- */
- public long getDeliveryTag()
- {
- return _deliveryTag;
- }
-
-
-
-
-
-
protected void checkPropertyName(CharSequence propertyName)
{
if (propertyName == null)
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index b9ba946a20..9ab03412fe 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -30,7 +30,6 @@ import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
-import javax.jms.Session;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -60,15 +59,12 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
private ContentHeaderProperties _contentHeaderProperties;
- /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- private AMQSession _session;
- private final long _deliveryTag;
// The base set of items that needs to be set.
private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
{
+ super(deliveryTag);
_contentHeaderProperties = properties;
- _deliveryTag = deliveryTag;
_readableProperties = (_contentHeaderProperties != null);
_headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
: (new BasicContentHeaderProperties()).getHeaders() );
@@ -518,58 +514,4 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
_readableProperties = false;
}
-
-
- public void acknowledgeThis() throws JMSException
- {
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- if (_session.getAMQConnection().isClosed())
- {
- throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
- // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
- // received on the session
- _session.acknowledgeMessage(_deliveryTag, true);
- }
- }
-
- public void acknowledge() throws JMSException
- {
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.acknowledge();
- }
- }
-
-
- /**
- * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
- * acknowledge()
- *
- * @param s the AMQ session that delivered this message
- */
- public void setAMQSession(AMQSession s)
- {
- _session = s;
- }
-
- public AMQSession getAMQSession()
- {
- return _session;
- }
-
- /**
- * Get the AMQ message number assigned to this message
- *
- * @return the message number
- */
- public long getDeliveryTag()
- {
- return _deliveryTag;
- }
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
index 89fbc9722c..1b6c0c751d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
@@ -23,9 +23,13 @@ package org.apache.qpid.client.message;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -78,7 +82,25 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
new ExchangeInfo(ExchangeDefaults.HEADERS_EXCHANGE_NAME.toString(),
ExchangeDefaults.HEADERS_EXCHANGE_CLASS.toString(),
AMQDestination.QUEUE_TYPE));
-
+ }
+
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+ private AMQSession<?,?> _session;
+ private final long _deliveryTag;
+
+ protected AbstractAMQMessageDelegate(long deliveryTag)
+ {
+ _deliveryTag = deliveryTag;
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
}
/**
@@ -157,6 +179,47 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
{
return _exchangeMap.containsKey(exchange);
}
+
+ public void acknowledgeThis() throws JMSException
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ if (_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _session.acknowledgeMessage(getDeliveryTag(), true);
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ _session.acknowledge();
+ }
+ }
+
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession<?,?> s)
+ {
+ _session = s;
+ }
+
+ public AMQSession<?,?> getAMQSession()
+ {
+ return _session;
+ }
}
class ExchangeInfo
@@ -202,5 +265,5 @@ class ExchangeInfo
public void setDestType(int destType)
{
this.destType = destType;
- }
+ }
}