diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-04-16 09:34:22 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-04-16 09:34:22 +0000 |
commit | f86d63f08e2eb56c7adc68b837cf3cd7ac4bdac5 (patch) | |
tree | ccb5c7663e6b1480d81748db6d0d73a43076fc71 | |
parent | e34b410d62899c362565bc7b3a0039604da7841d (diff) | |
download | qpid-python-f86d63f08e2eb56c7adc68b837cf3cd7ac4bdac5.tar.gz |
QPID-925 : Only begin store transactions when there is a persistent message to be committed
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@648645 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java | 33 | ||||
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java | 7 |
2 files changed, 33 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index c62a7880a8..6ad704a5d8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.ack; import java.util.LinkedList; import java.util.List; +import java.util.ArrayList; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; @@ -33,8 +34,8 @@ import org.apache.qpid.server.txn.TxnOp; public class TxAck implements TxnOp { private final UnacknowledgedMessageMap _map; - private final List <UnacknowledgedMessage> _unacked = new LinkedList<UnacknowledgedMessage>(); - private final List<Long> _individual = new LinkedList<Long>(); + private final List <UnacknowledgedMessage> _unacked = new ArrayList<UnacknowledgedMessage>(); + private List<Long> _individual; private long _deliveryTag; private boolean _multiple; @@ -47,6 +48,10 @@ public class TxAck implements TxnOp { if (!multiple) { + if(_individual == null) + { + _individual = new ArrayList<Long>(); + } //have acked a single message that is not part of //the previously acked region so record //individually @@ -59,30 +64,46 @@ public class TxAck implements TxnOp _deliveryTag = deliveryTag; _multiple = true; } + _unacked.clear(); } public void consolidate() { + if(_unacked.isEmpty()) + { + consolidate(_unacked); + } + + } + + private void consolidate(List<UnacknowledgedMessage> unacked) + { //lookup all the unacked messages that have been acked in this transaction if (_multiple) { //get all the unacked messages for the accumulated //multiple acks - _map.collect(_deliveryTag, true, _unacked); + _map.collect(_deliveryTag, true, unacked); } //get any unacked messages for individual acks outside the //range covered by multiple acks - for (long tag : _individual) + if(_individual != null) { - if(_deliveryTag < tag) + for (Long tag : _individual) { - _map.collect(tag, false, _unacked); + if(_deliveryTag < tag) + { + _map.collect(tag, false, unacked); + } } } } public boolean checkPersistent() throws AMQException { + + + consolidate(); //if any of the messages in unacked are persistent the txn //buffer must be marked as persistent: for (UnacknowledgedMessage msg : _unacked) diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index b12afd9a41..2307b94566 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -148,8 +148,9 @@ public class LocalTransactionalContext implements TransactionalContext // we will need to create and enlist the op. if (_ackOp == null) { - beginTranIfNecessary(); + _ackOp = new TxAck(unacknowledgedMessageMap); + _txnBuffer.enlist(_ackOp); } // update the op to include this ack request @@ -163,6 +164,10 @@ public class LocalTransactionalContext implements TransactionalContext { _ackOp.update(deliveryTag, multiple); } + if(!_inTran && _ackOp.checkPersistent()) + { + beginTranIfNecessary(); + } } public void messageFullyReceived(boolean persistent) throws AMQException |