summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-10-06 16:40:43 +0000
committerRobert Gemmell <robbie@apache.org>2011-10-06 16:40:43 +0000
commit371bf976678df3ffba6ebc14a7ff0ed676097ce9 (patch)
tree57e5dd34ba81d38dc0ea3fa5cc33bd0141707f00
parent44ac944e1747881a74061ee0221535a03122be69 (diff)
downloadqpid-python-371bf976678df3ffba6ebc14a7ff0ed676097ce9.tar.gz
QPID-3527: update handling of auto-ack messages for 0-10 to send acks (asynchronously) on a per-message basis rather than batching for 1 second, update handling for other ack modes to be clearer with respect to 0-8/0-10 behavioural differences. Remove some redundant methods from AMQSession, updating handling of 'no consume'/'isBrowseOnly' such that BasicMessageConsumer is always supplied a single consistent answer for whether it is non-consuming or not.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1179698 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java64
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java41
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java1
6 files changed, 53 insertions, 80 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 3ef32fb008..acd46da11a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -793,7 +793,7 @@ public abstract class AMQDestination implements Destination, Referenceable
return _browseOnly;
}
- public void setBrowseOnly(boolean b)
+ private void setBrowseOnly(boolean b)
{
_browseOnly = b;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index a41f2f9b17..a477832892 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -952,7 +952,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
}
- public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
+ protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
checkValidDestination(destination);
@@ -966,15 +966,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
- ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public C createExclusiveConsumer(Destination destination) throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
- ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -982,7 +974,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
- messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ messageSelector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -991,16 +983,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
- messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
- throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
- messageSelector, null, false, false);
+ messageSelector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
@@ -1008,23 +991,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
- boolean exclusive, String selector) throws JMSException
+ boolean exclusive, String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
- String selector, FieldTable rawSelector) throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -1032,7 +1007,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()),
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination),
false);
}
@@ -1438,9 +1413,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
- Topic dest = checkValidTopic(topic);
+ checkValidTopic(topic);
- return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
+ return new TopicSubscriberAdaptor<C>(topic,
+ createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false));
}
/**
@@ -1457,10 +1433,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
- Topic dest = checkValidTopic(topic);
+ checkValidTopic(topic);
- // AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor<C>(topic,
+ createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal,
+ true, messageSelector, null, false, false));
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1985,6 +1962,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkTemporaryDestination(destination);
+ if(!noConsume && isBrowseOnlyDestination(destination))
+ {
+ throw new InvalidDestinationException("The consumer being created is not 'noConsume'," +
+ "but a 'browseOnly' Destination has been supplied.");
+ }
+
final String messageSelector;
if (_strictAMQP && !((selector == null) || selector.equals("")))
@@ -3526,4 +3509,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
return code;
}
+
+ private boolean isBrowseOnlyDestination(Destination destination)
+ {
+ return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly());
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 8f77cc6258..86e1fc08de 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -270,7 +270,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
long prefetch = getAMQConnection().getMaxPrefetch();
- if (unackedCount >= prefetch/2 || maxAckDelay <= 0)
+ if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE)
{
flushAcknowledgments();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index e6e1398a35..3b807591b0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -734,34 +734,27 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
case Session.PRE_ACKNOWLEDGE:
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
break;
+ case Session.AUTO_ACKNOWLEDGE:
+ //fall through
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+ break;
case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- msg.setAMQSession(_session);
- _session.addUnacknowledgedMessage(msg.getDeliveryTag());
- _session.markDirty();
- }
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+ _session.markDirty();
break;
case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- _session.addDeliveredMessage(msg.getDeliveryTag());
- _session.markDirty();
- }
-
+ _session.addDeliveredMessage(msg.getDeliveryTag());
+ _session.markDirty();
+ break;
+ case Session.NO_ACKNOWLEDGE:
+ //do nothing.
+ //path used for NO-ACK consumers, and browsers (see constructor).
break;
}
-
}
void postDeliver(AbstractJMSMessage msg)
@@ -883,7 +876,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
public boolean isNoConsume()
{
- return _noConsume || _destination.isBrowseOnly() ;
+ return _noConsume;
}
public void rollback()
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index d3494298d3..3c24c67f9b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -66,19 +66,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private boolean _preAcquire = true;
/**
- * Indicate whether this consumer is started.
- */
- private boolean _isStarted = false;
-
- /**
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
private long capacity = 0;
-
- //--- constructor
+
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
@@ -104,7 +98,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_preAcquire = false;
}
}
- _isStarted = connection.started();
// Destination setting overrides connection defaults
if (destination.getDestSyntax() == DestSyntax.ADDR &&
@@ -172,8 +165,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
- //----- overwritten methods
-
/**
* This method is invoked when this consumer is stopped.
* It tells the broker to stop delivering messages to this consumer.
@@ -208,8 +199,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
super.preDeliver(jmsMsg);
- if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
+ if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
{
+ //For 0-10 we need to ensure that all messages are indicated processed in some way to
+ //ensure their AMQP command-id is marked completed, and so we must send a completion
+ //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
+ //Add message to the unacked message list to ensure we dont lose record of it before
+ //sending a completion of some sort.
_session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
}
}
@@ -221,7 +217,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _messageFactory.createMessage(msg.getMessageTransfer());
}
- // private methods
/**
* Check whether a message can be delivered to this consumer.
*
@@ -459,10 +454,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
- !_session.isInRecovery() &&
- _session.getAMQConnection().getSyncAck())
+ !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
{
- ((AMQSession_0_10) getSession()).flushAcknowledgments();
((AMQSession_0_10) getSession()).getQpidSession().sync();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index b5a7a0f216..f360b546b2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -198,7 +198,6 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
}
}
-
public long getJMSTimestamp() throws JMSException
{
return _deliveryProps.getTimestamp();