diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-11 15:42:45 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-11 15:42:45 +0000 |
commit | 4a5cad728b45af81f64d21901968380142d7d111 (patch) | |
tree | 0b11f89e2b9d5f4d4c085092a4bcd608501b3f51 | |
parent | 531f130c20f73c08ffc963c1e15c7b98cea05d8f (diff) | |
download | qpid-python-4a5cad728b45af81f64d21901968380142d7d111.tar.gz |
Copy over QPID-925
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655326 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java | 39 | ||||
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java | 7 |
2 files changed, 30 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index 55d636696d..cb3aa5259a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -24,6 +24,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.HashMap; +import java.util.ArrayList; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; @@ -37,7 +38,7 @@ public class TxAck implements TxnOp { private final UnacknowledgedMessageMap _map; private final Map<Long, QueueEntry> _unacked = new HashMap<Long,QueueEntry>(); - private final List<Long> _individual = new LinkedList<Long>(); + private List<Long> _individual; private long _deliveryTag; private boolean _multiple; @@ -50,7 +51,10 @@ public class TxAck implements TxnOp { if (!multiple) { - + if(_individual == null) + { + _individual = new ArrayList<Long>(); + } //have acked a single message that is not part of //the previously acked region so record //individually @@ -67,26 +71,33 @@ public class TxAck implements TxnOp public void consolidate() { - //lookup all the unacked messages that have been acked in this transaction - if (_multiple) + if(_unacked.isEmpty()) { - //get all the unacked messages for the accumulated - //multiple acks - _map.collect(_deliveryTag, true, _unacked); - } - //get any unacked messages for individual acks outside the - //range covered by multiple acks - for (long tag : _individual) - { - if(_deliveryTag < tag) + //lookup all the unacked messages that have been acked in this transaction + if (_multiple) + { + //get all the unacked messages for the accumulated + //multiple acks + _map.collect(_deliveryTag, true, _unacked); + } + if(_individual != null) { - _map.collect(tag, false, _unacked); + //get any unacked messages for individual acks outside the + //range covered by multiple acks + for (long tag : _individual) + { + if(_deliveryTag < tag) + { + _map.collect(tag, false, _unacked); + } + } } } } public boolean checkPersistent() throws AMQException { + consolidate(); //if any of the messages in unacked are persistent the txn //buffer must be marked as persistent: for (QueueEntry msg : _unacked.values()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 4234820480..ad8303ec5d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -176,8 +176,7 @@ public class LocalTransactionalContext implements TransactionalContext // as new acks come in. If this is the first ack in the txn // we will need to create and enlist the op. if (_ackOp == null) - { - beginTranIfNecessary(); + { _ackOp = new TxAck(unacknowledgedMessageMap); _txnBuffer.enlist(_ackOp); } @@ -192,6 +191,10 @@ public class LocalTransactionalContext implements TransactionalContext { _ackOp.update(deliveryTag, multiple); } + if(!_inTran && _ackOp.checkPersistent()) + { + beginTranIfNecessary(); + } } public void messageFullyReceived(boolean persistent) throws AMQException |