summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java29
1 files changed, 12 insertions, 17 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 47da59724c..3c24c67f9b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -66,19 +66,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private boolean _preAcquire = true;
/**
- * Indicate whether this consumer is started.
- */
- private boolean _isStarted = false;
-
- /**
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
private long capacity = 0;
-
- //--- constructor
+
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
@@ -104,7 +98,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_preAcquire = false;
}
}
- _isStarted = connection.started();
// Destination setting overrides connection defaults
if (destination.getDestSyntax() == DestSyntax.ADDR &&
@@ -172,8 +165,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
- //----- overwritten methods
-
/**
* This method is invoked when this consumer is stopped.
* It tells the broker to stop delivering messages to this consumer.
@@ -203,11 +194,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
super.notifyMessage(messageFrame);
}
- @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ @Override
+ protected void preDeliver(AbstractJMSMessage jmsMsg)
{
- super.preApplicationProcessing(jmsMsg);
- if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
+ super.preDeliver(jmsMsg);
+
+ if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
{
+ //For 0-10 we need to ensure that all messages are indicated processed in some way to
+ //ensure their AMQP command-id is marked completed, and so we must send a completion
+ //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
+ //Add message to the unacked message list to ensure we dont lose record of it before
+ //sending a completion of some sort.
_session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
}
}
@@ -219,7 +217,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _messageFactory.createMessage(msg.getMessageTransfer());
}
- // private methods
/**
* Check whether a message can be delivered to this consumer.
*
@@ -457,10 +454,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
- !_session.isInRecovery() &&
- _session.getAMQConnection().getSyncAck())
+ !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
{
- ((AMQSession_0_10) getSession()).flushAcknowledgments();
((AMQSession_0_10) getSession()).getQpidSession().sync();
}
}