diff options
Diffstat (limited to 'java/broker/src')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java | 30 |
1 files changed, 24 insertions, 6 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 38cf14d772..18f1836185 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -110,6 +110,9 @@ public class NonTransactionalContext implements TransactionalContext boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException { + + final boolean debug = _log.isDebugEnabled(); + ; if (multiple) { if (deliveryTag == 0) @@ -123,11 +126,14 @@ public class NonTransactionalContext implements TransactionalContext { public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException { - if (_log.isDebugEnabled()) + if (debug) { _log.debug("Discarding message: " + message.getMessage().getMessageId()); } - + if(message.getMessage().isPersistent()) + { + beginTranIfNecessary(); + } message.restoreCredit(); //Message has been ack so discard it. This will dequeue and decrement the reference. message.discard(_storeContext); @@ -152,11 +158,14 @@ public class NonTransactionalContext implements TransactionalContext unacknowledgedMessageMap.drainTo(acked, deliveryTag); for (QueueEntry msg : acked) { - if (_log.isDebugEnabled()) + if (debug) { _log.debug("Discarding message: " + msg.getMessage().getMessageId()); } - + if(msg.getMessage().isPersistent()) + { + beginTranIfNecessary(); + } //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(_storeContext); @@ -176,20 +185,29 @@ public class NonTransactionalContext implements TransactionalContext _channel.getChannelId()); } - if (_log.isDebugEnabled()) + if (debug) { _log.debug("Discarding message: " + msg.getMessage().getMessageId()); } + if(msg.getMessage().isPersistent()) + { + beginTranIfNecessary(); + } //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(_storeContext); - if (_log.isDebugEnabled()) + if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + msg.getMessage().getMessageId()); } } + if(_inTran) + { + _messageStore.commitTran(_storeContext); + _inTran = false; + } } public void messageFullyReceived(boolean persistent) throws AMQException |