summaryrefslogtreecommitdiff
path: root/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java')
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java20
1 files changed, 15 insertions, 5 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index e321245a0e..58b7d4f625 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -766,7 +766,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession
{
while(!_closed)
{
- while(!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty()))
+ while(!_closed && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty())))
{
try
{
@@ -777,7 +777,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession
return;
}
}
- while(_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty()))
+ while(!_closed && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty())))
{
Message msg;
@@ -804,6 +804,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession
if(message != null)
{
+ if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE)
+ {
+ consumer.setLastUnackedMessage(msg.getDeliveryTag());
+ }
_currentConsumer = consumer;
_currentMessage = msg;
try
@@ -816,11 +820,11 @@ public class SessionImpl implements Session, QueueSession, TopicSession
_currentMessage = null;
}
- if((_recoveredMessage == null) && (_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE
- || _acknowledgeMode == AcknowledgeMode.DUPS_OK_ACKNOWLEDGE))
+ if(_recoveredMessage == null)
{
- consumer.acknowledge(msg);
+ consumer.preReceiveAction(msg);
}
+
}
}
@@ -895,4 +899,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession
{
_isTopicSession = topicSession;
}
+
+ String toAddress(DestinationImpl dest)
+ {
+ return _connection.toDecodedDestination(dest).getAddress();
+ }
+
}