summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java74
1 files changed, 49 insertions, 25 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 7d535643c0..8b17dcf91f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -31,6 +31,7 @@ import org.apache.qpid.filter.JMSSelectorFilter;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -148,7 +149,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (isMessageListenerSet() && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
@@ -246,7 +248,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if(! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
// now we need to acquire this message if needed
@@ -258,9 +261,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_logger.debug("filterMessage - trying to acquire message");
}
messageOk = acquireMessage(message);
- _logger.debug("filterMessage - *************************************");
_logger.debug("filterMessage - message acquire status : " + messageOk);
- _logger.debug("filterMessage - *************************************");
}
return messageOk;
}
@@ -335,7 +336,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (messageListener != null && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
if (messageListener != null && !_synchronousQueue.isEmpty())
{
@@ -349,26 +351,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
- public boolean isStrated()
+ public void failedOverPost()
{
- return _isStarted;
- }
-
- public void start()
- {
- _isStarted = true;
- if (_syncReceive.get())
+ if (_0_10session.isStarted() && _syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
- public void stop()
- {
- _isStarted = false;
- }
-
/**
* When messages are not prefetched we need to request a message from the
* broker.
@@ -380,16 +372,35 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
- }
if (! getSession().prefetch())
{
_syncReceive.set(true);
}
+ if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
Object o = super.getMessageFromQueue(l);
+ if (o == null && _0_10session.isStarted())
+ {
+ _0_10session.getQpidSession().messageFlush
+ (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
+ _0_10session.getQpidSession().sync();
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.BYTE,
+ 0xFFFFFFFF, Option.UNRELIABLE);
+ if (getSession().prefetch())
+ {
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE,
+ _0_10session.getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
+ }
+ _0_10session.syncDispatchQueue();
+ o = super.getMessageFromQueue(-1);
+ }
if (! getSession().prefetch())
{
_syncReceive.set(false);
@@ -404,6 +415,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
+
+ if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
+ !_session.isInRecovery() &&
+ _session.getAMQConnection().getSyncAck())
+ {
+ ((AMQSession_0_10) getSession()).flushAcknowledgments();
+ ((AMQSession_0_10) getSession()).getQpidSession().sync();
+ }
+ }
+
+ Message receiveBrowse() throws JMSException
+ {
+ return receiveNoWait();
}
}