diff options
Diffstat (limited to 'java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java')
-rw-r--r-- | java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index e321245a0e..58b7d4f625 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -766,7 +766,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession {
while(!_closed)
{
- while(!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty()))
+ while(!_closed && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty())))
{
try
{
@@ -777,7 +777,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession return;
}
}
- while(_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty()))
+ while(!_closed && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty())))
{
Message msg;
@@ -804,6 +804,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession if(message != null)
{
+ if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE)
+ {
+ consumer.setLastUnackedMessage(msg.getDeliveryTag());
+ }
_currentConsumer = consumer;
_currentMessage = msg;
try
@@ -816,11 +820,11 @@ public class SessionImpl implements Session, QueueSession, TopicSession _currentMessage = null;
}
- if((_recoveredMessage == null) && (_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE
- || _acknowledgeMode == AcknowledgeMode.DUPS_OK_ACKNOWLEDGE))
+ if(_recoveredMessage == null)
{
- consumer.acknowledge(msg);
+ consumer.preReceiveAction(msg);
}
+
}
}
@@ -895,4 +899,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession {
_isTopicSession = topicSession;
}
+
+ String toAddress(DestinationImpl dest)
+ {
+ return _connection.toDecodedDestination(dest).getAddress();
+ }
+
}
|