summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java182
1 files changed, 83 insertions, 99 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 548e274571..b5f3501e5a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -19,11 +19,10 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
@@ -66,13 +65,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private boolean _preAcquire = true;
/**
+ * Indicate whether this consumer is started.
+ */
+ private boolean _isStarted = false;
+
+ /**
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
private long capacity = 0;
-
+
+ //--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
@@ -98,6 +103,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_preAcquire = false;
}
}
+ _isStarted = connection.started();
// Destination setting overrides connection defaults
if (destination.getDestSyntax() == DestSyntax.ADDR &&
@@ -150,20 +156,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (isMessageListenerSet() && capacity == 0)
{
- messageFlow();
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
}
- else
- {
- // if we are synchronously waiting for a message
- // and messages are not pre-fetched we then need to request another one
- if(capacity == 0)
- {
- messageFlow();
- }
- }
}
catch (AMQException e)
{
@@ -172,6 +171,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
+ //----- overwritten methods
+
/**
* This method is invoked when this consumer is stopped.
* It tells the broker to stop delivering messages to this consumer.
@@ -201,18 +202,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
super.notifyMessage(messageFrame);
}
- @Override
- protected void preDeliver(AbstractJMSMessage jmsMsg)
+ @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
{
- super.preDeliver(jmsMsg);
-
- if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
+ super.preApplicationProcessing(jmsMsg);
+ if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
{
- //For 0-10 we need to ensure that all messages are indicated processed in some way to
- //ensure their AMQP command-id is marked completed, and so we must send a completion
- //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
- //Add message to the unacked message list to ensure we dont lose record of it before
- //sending a completion of some sort.
_session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
}
}
@@ -224,6 +218,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _messageFactory.createMessage(msg.getMessageTransfer());
}
+ // private methods
/**
* Check whether a message can be delivered to this consumer.
*
@@ -252,7 +247,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_logger.debug("messageOk " + messageOk);
_logger.debug("_preAcquire " + _preAcquire);
}
-
if (!messageOk)
{
if (_preAcquire)
@@ -269,12 +263,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (_logger.isDebugEnabled())
{
- _logger.debug("filterMessage - not ack'ing message as not acquired");
+ _logger.debug("Message not OK, releasing");
}
- flushUnwantedMessage(message);
+ releaseMessage(message);
+ }
+ // if we are syncrhonously waiting for a message
+ // and messages are not prefetched we then need to request another one
+ if(capacity == 0)
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
-
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
if (!_preAcquire && messageOk && !isNoConsume())
@@ -286,7 +287,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
messageOk = acquireMessage(message);
_logger.debug("filterMessage - message acquire status : " + messageOk);
}
-
return messageOk;
}
@@ -297,38 +297,38 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
* @param message The message to be acknowledged
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
+ private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
{
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.messageAcknowledge
- (ranges,
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
-
- final AMQException amqe = _0_10session.getCurrentException();
- if (amqe != null)
+ if (!_preAcquire)
{
- throw amqe;
+ RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.messageAcknowledge
+ (ranges,
+ _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+
+ AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
}
/**
- * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
- * processed to ensure their AMQP command-id is marked completed.
+ * Release a message
*
- * @param message The unwanted message to be flushed
- * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
+ * @param message The message to be released
+ * @throws AMQException If the message cannot be released due to some internal error.
*/
- private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
+ private void releaseMessage(AbstractJMSMessage message) throws AMQException
{
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.flushProcessed(ranges,false);
-
- final AMQException amqe = _0_10session.getCurrentException();
- if (amqe != null)
+ if (_preAcquire)
{
- throw amqe;
+ RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.getQpidSession().messageRelease(ranges);
+ _0_10session.sync();
}
}
@@ -339,52 +339,44 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
* @return true if the message has been acquired, false otherwise.
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
+ private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
{
boolean result = false;
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
+ if (!_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
- final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+ Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
- final RangeSet acquired = acq.getTransfers();
- if (acquired != null && acquired.size() > 0)
- {
- result = true;
+ RangeSet acquired = acq.getTransfers();
+ if (acquired != null && acquired.size() > 0)
+ {
+ result = true;
+ }
}
return result;
}
- private void messageFlow()
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
- }
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- try
+ if (messageListener != null && capacity == 0)
{
- if (messageListener != null && capacity == 0)
- {
- messageFlow();
- }
- if (messageListener != null && !_synchronousQueue.isEmpty())
- {
- Iterator messages=_synchronousQueue.iterator();
- while (messages.hasNext())
- {
- AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
- messages.remove();
- _session.rejectMessage(message, true);
- }
- }
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
- catch(TransportException e)
+ if (messageListener != null && !_synchronousQueue.isEmpty())
{
- throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e);
+ Iterator messages=_synchronousQueue.iterator();
+ while (messages.hasNext())
+ {
+ AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+ messages.remove();
+ _session.rejectMessage(message, true);
+ }
}
}
@@ -392,7 +384,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (_0_10session.isStarted() && _syncReceive.get())
{
- messageFlow();
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
@@ -413,7 +407,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
{
- messageFlow();
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
Object o = super.getMessageFromQueue(l);
if (o == null && _0_10session.isStarted())
@@ -444,7 +440,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return o;
}
- void postDeliver(AbstractJMSMessage msg)
+ void postDeliver(AbstractJMSMessage msg) throws JMSException
{
super.postDeliver(msg);
if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
@@ -453,8 +449,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
- !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+ !_session.isInRecovery() &&
+ _session.getAMQConnection().getSyncAck())
{
+ ((AMQSession_0_10) getSession()).flushAcknowledgments();
((AMQSession_0_10) getSession()).getQpidSession().sync();
}
}
@@ -511,18 +509,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _exclusive;
}
}
-
- void cleanupQueue() throws AMQException, FailoverException
- {
- AMQDestination dest = this.getDestination();
- if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.RECEIVER )
- {
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- this.getDestination().getQueueName());
- }
- }
- }
}