From f86d63f08e2eb56c7adc68b837cf3cd7ac4bdac5 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 16 Apr 2008 09:34:22 +0000 Subject: QPID-925 : Only begin store transactions when there is a persistent message to be committed git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@648645 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/ack/TxAck.java | 33 ++++++++++++++++++---- .../qpid/server/txn/LocalTransactionalContext.java | 7 ++++- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index c62a7880a8..6ad704a5d8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.ack; import java.util.LinkedList; import java.util.List; +import java.util.ArrayList; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; @@ -33,8 +34,8 @@ import org.apache.qpid.server.txn.TxnOp; public class TxAck implements TxnOp { private final UnacknowledgedMessageMap _map; - private final List _unacked = new LinkedList(); - private final List _individual = new LinkedList(); + private final List _unacked = new ArrayList(); + private List _individual; private long _deliveryTag; private boolean _multiple; @@ -47,6 +48,10 @@ public class TxAck implements TxnOp { if (!multiple) { + if(_individual == null) + { + _individual = new ArrayList(); + } //have acked a single message that is not part of //the previously acked region so record //individually @@ -59,30 +64,46 @@ public class TxAck implements TxnOp _deliveryTag = deliveryTag; _multiple = true; } + _unacked.clear(); } public void consolidate() + { + if(_unacked.isEmpty()) + { + consolidate(_unacked); + } + + } + + private void consolidate(List unacked) { //lookup all the unacked messages that have been acked in this transaction if (_multiple) { //get all the unacked messages for the accumulated //multiple acks - _map.collect(_deliveryTag, true, _unacked); + _map.collect(_deliveryTag, true, unacked); } //get any unacked messages for individual acks outside the //range covered by multiple acks - for (long tag : _individual) + if(_individual != null) { - if(_deliveryTag < tag) + for (Long tag : _individual) { - _map.collect(tag, false, _unacked); + if(_deliveryTag < tag) + { + _map.collect(tag, false, unacked); + } } } } public boolean checkPersistent() throws AMQException { + + + consolidate(); //if any of the messages in unacked are persistent the txn //buffer must be marked as persistent: for (UnacknowledgedMessage msg : _unacked) diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index b12afd9a41..2307b94566 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -148,8 +148,9 @@ public class LocalTransactionalContext implements TransactionalContext // we will need to create and enlist the op. if (_ackOp == null) { - beginTranIfNecessary(); + _ackOp = new TxAck(unacknowledgedMessageMap); + _txnBuffer.enlist(_ackOp); } // update the op to include this ack request @@ -163,6 +164,10 @@ public class LocalTransactionalContext implements TransactionalContext { _ackOp.update(deliveryTag, multiple); } + if(!_inTran && _ackOp.checkPersistent()) + { + beginTranIfNecessary(); + } } public void messageFullyReceived(boolean persistent) throws AMQException -- cgit v1.2.1