summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java102
1 files changed, 40 insertions, 62 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 3c24c67f9b..5fba351d8a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -150,13 +150,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (isMessageListenerSet() && capacity == 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
}
+ else
+ {
+ // if we are synchronously waiting for a message
+ // and messages are not pre-fetched we then need to request another one
+ if(capacity == 0)
+ {
+ messageFlow();
+ }
+ }
}
catch (AMQException e)
{
@@ -245,6 +252,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_logger.debug("messageOk " + messageOk);
_logger.debug("_preAcquire " + _preAcquire);
}
+
if (!messageOk)
{
if (_preAcquire)
@@ -261,19 +269,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Message not OK, releasing");
+ _logger.debug("filterMessage - not ack'ing messaage as not aquired");
}
- releaseMessage(message);
- }
- // if we are syncrhonously waiting for a message
- // and messages are not prefetched we then need to request another one
- if(capacity == 0)
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
}
}
+
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
if (!_preAcquire && messageOk && !isNoConsume())
@@ -285,6 +285,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
messageOk = acquireMessage(message);
_logger.debug("filterMessage - message acquire status : " + messageOk);
}
+
return messageOk;
}
@@ -295,38 +296,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
* @param message The message to be acknowledged
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
+ private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
{
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.messageAcknowledge
- (ranges,
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
-
- AMQException amqe = _0_10session.getCurrentException();
- if (amqe != null)
- {
- throw amqe;
- }
- }
- }
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.messageAcknowledge
+ (ranges,
+ _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
- /**
- * Release a message
- *
- * @param message The message to be released
- * @throws AMQException If the message cannot be released due to some internal error.
- */
- private void releaseMessage(AbstractJMSMessage message) throws AMQException
- {
- if (_preAcquire)
+ final AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
{
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.getQpidSession().messageRelease(ranges);
- _0_10session.sync();
+ throw amqe;
}
}
@@ -337,25 +318,28 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
* @return true if the message has been acquired, false otherwise.
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
+ private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
{
boolean result = false;
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
- Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+ final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
- RangeSet acquired = acq.getTransfers();
- if (acquired != null && acquired.size() > 0)
- {
- result = true;
- }
+ final RangeSet acquired = acq.getTransfers();
+ if (acquired != null && acquired.size() > 0)
+ {
+ result = true;
}
return result;
}
+ private void messageFlow()
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
@@ -364,9 +348,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (messageListener != null && capacity == 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
if (messageListener != null && !_synchronousQueue.isEmpty())
{
@@ -389,9 +371,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (_0_10session.isStarted() && _syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow
- (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
}
@@ -412,9 +392,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
Object o = super.getMessageFromQueue(l);
if (o == null && _0_10session.isStarted())