summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-04-16 09:34:22 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-04-16 09:34:22 +0000
commitf86d63f08e2eb56c7adc68b837cf3cd7ac4bdac5 (patch)
treeccb5c7663e6b1480d81748db6d0d73a43076fc71
parente34b410d62899c362565bc7b3a0039604da7841d (diff)
downloadqpid-python-f86d63f08e2eb56c7adc68b837cf3cd7ac4bdac5.tar.gz
QPID-925 : Only begin store transactions when there is a persistent message to be committed
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@648645 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java33
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java7
2 files changed, 33 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index c62a7880a8..6ad704a5d8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.ack;
import java.util.LinkedList;
import java.util.List;
+import java.util.ArrayList;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
@@ -33,8 +34,8 @@ import org.apache.qpid.server.txn.TxnOp;
public class TxAck implements TxnOp
{
private final UnacknowledgedMessageMap _map;
- private final List <UnacknowledgedMessage> _unacked = new LinkedList<UnacknowledgedMessage>();
- private final List<Long> _individual = new LinkedList<Long>();
+ private final List <UnacknowledgedMessage> _unacked = new ArrayList<UnacknowledgedMessage>();
+ private List<Long> _individual;
private long _deliveryTag;
private boolean _multiple;
@@ -47,6 +48,10 @@ public class TxAck implements TxnOp
{
if (!multiple)
{
+ if(_individual == null)
+ {
+ _individual = new ArrayList<Long>();
+ }
//have acked a single message that is not part of
//the previously acked region so record
//individually
@@ -59,30 +64,46 @@ public class TxAck implements TxnOp
_deliveryTag = deliveryTag;
_multiple = true;
}
+ _unacked.clear();
}
public void consolidate()
{
+ if(_unacked.isEmpty())
+ {
+ consolidate(_unacked);
+ }
+
+ }
+
+ private void consolidate(List<UnacknowledgedMessage> unacked)
+ {
//lookup all the unacked messages that have been acked in this transaction
if (_multiple)
{
//get all the unacked messages for the accumulated
//multiple acks
- _map.collect(_deliveryTag, true, _unacked);
+ _map.collect(_deliveryTag, true, unacked);
}
//get any unacked messages for individual acks outside the
//range covered by multiple acks
- for (long tag : _individual)
+ if(_individual != null)
{
- if(_deliveryTag < tag)
+ for (Long tag : _individual)
{
- _map.collect(tag, false, _unacked);
+ if(_deliveryTag < tag)
+ {
+ _map.collect(tag, false, unacked);
+ }
}
}
}
public boolean checkPersistent() throws AMQException
{
+
+
+ consolidate();
//if any of the messages in unacked are persistent the txn
//buffer must be marked as persistent:
for (UnacknowledgedMessage msg : _unacked)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index b12afd9a41..2307b94566 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -148,8 +148,9 @@ public class LocalTransactionalContext implements TransactionalContext
// we will need to create and enlist the op.
if (_ackOp == null)
{
- beginTranIfNecessary();
+
_ackOp = new TxAck(unacknowledgedMessageMap);
+
_txnBuffer.enlist(_ackOp);
}
// update the op to include this ack request
@@ -163,6 +164,10 @@ public class LocalTransactionalContext implements TransactionalContext
{
_ackOp.update(deliveryTag, multiple);
}
+ if(!_inTran && _ackOp.checkPersistent())
+ {
+ beginTranIfNecessary();
+ }
}
public void messageFullyReceived(boolean persistent) throws AMQException