diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-04-16 09:40:42 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-04-16 09:40:42 +0000 |
commit | 56e72efee5eeefa3d73df1f9fbd77058d017a8ee (patch) | |
tree | 00533c79bc0c30acc602f500888dd320a419556e | |
parent | f86d63f08e2eb56c7adc68b837cf3cd7ac4bdac5 (diff) | |
download | qpid-python-56e72efee5eeefa3d73df1f9fbd77058d017a8ee.tar.gz |
QPID-926 : Perform all store operations associated with an acknowledge in a single store transaction
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@648648 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java | 73 |
1 files changed, 39 insertions, 34 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 1e4b69c935..cac3489f4c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -49,8 +49,6 @@ public class NonTransactionalContext implements TransactionalContext /** Where to put undeliverable messages */ private final List<RequiredDeliveryException> _returnMessages; - private final Set<Long> _browsedAcks; - private final MessageStore _messageStore; private final StoreContext _storeContext; @@ -61,11 +59,17 @@ public class NonTransactionalContext implements TransactionalContext public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel, List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks) { + this(messageStore,storeContext,channel,returnMessages); + } + + public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel, + List<RequiredDeliveryException> returnMessages) + { _channel = channel; _storeContext = storeContext; _returnMessages = returnMessages; _messageStore = messageStore; - _browsedAcks = browsedAcks; + } @@ -112,6 +116,9 @@ public class NonTransactionalContext implements TransactionalContext boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException { + + final boolean debug = _log.isDebugEnabled(); + if (multiple) { if (deliveryTag == 0) @@ -125,20 +132,17 @@ public class NonTransactionalContext implements TransactionalContext { public boolean callback(UnacknowledgedMessage message) throws AMQException { - if (!_browsedAcks.contains(deliveryTag)) + if (debug) { - if (_log.isDebugEnabled()) - { - _log.debug("Discarding message: " + message.getMessage().getMessageId()); - } - - //Message has been ack so discard it. This will dequeue and decrement the reference. - message.discard(_storeContext); + _log.debug("Discarding message: " + message.getMessage().getMessageId()); } - else + if(message.getMessage().isPersistent()) { - _browsedAcks.remove(deliveryTag); + beginTranIfNecessary(); } + //Message has been ack so discard it. This will dequeue and decrement the reference. + message.discard(_storeContext); + return false; } @@ -159,20 +163,17 @@ public class NonTransactionalContext implements TransactionalContext unacknowledgedMessageMap.drainTo(acked, deliveryTag); for (UnacknowledgedMessage msg : acked) { - if (!_browsedAcks.contains(deliveryTag)) + if (debug) { - if (_log.isDebugEnabled()) - { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); - } - - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(_storeContext); + _log.debug("Discarding message: " + msg.getMessage().getMessageId()); } - else + if(msg.getMessage().isPersistent()) { - _browsedAcks.remove(deliveryTag); + beginTranIfNecessary(); } + + //Message has been ack so discard it. This will dequeue and decrement the reference. + msg.discard(_storeContext); } } } @@ -189,27 +190,31 @@ public class NonTransactionalContext implements TransactionalContext _channel.getChannelId()); } - if (!_browsedAcks.contains(deliveryTag)) + if (debug) { - if (_log.isDebugEnabled()) - { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); - } - - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(_storeContext); + _log.debug("Discarding message: " + msg.getMessage().getMessageId()); } - else + if(msg.getMessage().isPersistent()) { - _browsedAcks.remove(deliveryTag); + beginTranIfNecessary(); } - if (_log.isDebugEnabled()) + //Message has been ack so discard it. This will dequeue and decrement the reference. + msg.discard(_storeContext); + + if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + msg.getMessage().getMessageId()); } } + + if(_inTran) + { + _messageStore.commitTran(_storeContext); + _inTran = false; + } + } public void messageFullyReceived(boolean persistent) throws AMQException |