summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java138
1 files changed, 82 insertions, 56 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 2a37298a43..1b2d6876bd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -31,6 +31,7 @@ import org.apache.qpid.filter.JMSSelectorFilter;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,7 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
* This is a 0.10 message consumer.
*/
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
- implements org.apache.qpid.nclient.MessagePartListener
{
/**
@@ -114,9 +114,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _consumerTagString;
}
-
- // ----- Interface org.apache.qpid.client.util.MessageListener
-
/**
*
* This is invoked by the session thread when emptying the session message queue.
@@ -152,35 +149,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (isMessageListenerSet() && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
}
}
-
-
- /**
- * This method is invoked by the transport layer when a message is delivered for this
- * consumer. The message is transformed and pass to the session.
- * @param xfr an 0.10 message transfer
- */
- public void messageTransfer(MessageTransfer xfr)
-
- //public void onMessage(Message message)
- {
- int channelId = getSession().getChannelId();
- int consumerTag = getConsumerTag();
-
- UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(consumerTag, xfr);
-
-
- getSession().messageReceived(newMessage);
- // else ignore this message
- }
-
//----- overwritten methods
/**
@@ -272,7 +248,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if(! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
// now we need to acquire this message if needed
@@ -284,9 +261,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_logger.debug("filterMessage - trying to acquire message");
}
messageOk = acquireMessage(message);
- _logger.debug("filterMessage - *************************************");
_logger.debug("filterMessage - message acquire status : " + messageOk);
- _logger.debug("filterMessage - *************************************");
}
return messageOk;
}
@@ -304,8 +279,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
RangeSet ranges = new RangeSet();
ranges.add((int) message.getDeliveryTag());
- _0_10session.getQpidSession().messageAcknowledge(ranges,
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE );
+ _0_10session.messageAcknowledge
+ (ranges,
+ _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
_0_10session.getCurrentException();
}
}
@@ -360,7 +336,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (messageListener != null && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
if (messageListener != null && !_synchronousQueue.isEmpty())
{
@@ -374,26 +351,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
- public boolean isStrated()
+ public void failedOverPost()
{
- return _isStarted;
- }
-
- public void start()
- {
- _isStarted = true;
- if (_syncReceive.get())
+ if (_0_10session.isStarted() && _syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
- public void stop()
- {
- _isStarted = false;
- }
-
/**
* When messages are not prefetched we need to request a message from the
* broker.
@@ -405,16 +372,35 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
- }
if (! getSession().prefetch())
{
_syncReceive.set(true);
}
+ if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
Object o = super.getMessageFromQueue(l);
+ if (o == null && _0_10session.isStarted())
+ {
+ _0_10session.getQpidSession().messageFlush
+ (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
+ _0_10session.getQpidSession().sync();
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.BYTE,
+ 0xFFFFFFFF, Option.UNRELIABLE);
+ if (getSession().prefetch())
+ {
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE,
+ _0_10session.getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
+ }
+ _0_10session.syncDispatchQueue();
+ o = super.getMessageFromQueue(-1);
+ }
if (! getSession().prefetch())
{
_syncReceive.set(false);
@@ -425,10 +411,50 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
void postDeliver(AbstractJMSMessage msg) throws JMSException
{
super.postDeliver(msg);
- if(_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
+ if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+
+ if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
+ !_session.isInRecovery() &&
+ _session.getAMQConnection().getSyncAck())
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ ((AMQSession_0_10) getSession()).flushAcknowledgments();
+ ((AMQSession_0_10) getSession()).getQpidSession().sync();
+ }
+ }
+
+ Message receiveBrowse() throws JMSException
+ {
+ return receiveNoWait();
}
+ @Override public void rollbackPendingMessages()
+ {
+ if (_synchronousQueue.size() > 0)
+ {
+ RangeSet ranges = new RangeSet();
+ Iterator iterator = _synchronousQueue.iterator();
+ while (iterator.hasNext())
+ {
+
+ Object o = iterator.next();
+ if (o instanceof AbstractJMSMessage)
+ {
+ ranges.add((int) ((AbstractJMSMessage) o).getDeliveryTag());
+ iterator.remove();
+ }
+ else
+ {
+ _logger.error("Queue contained a :" + o.getClass()
+ + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ iterator.remove();
+ }
+ }
+
+ _0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ clearReceiveQueue();
+ }
+ }
}