summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-11 15:42:45 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-11 15:42:45 +0000
commit4a5cad728b45af81f64d21901968380142d7d111 (patch)
tree0b11f89e2b9d5f4d4c085092a4bcd608501b3f51
parent531f130c20f73c08ffc963c1e15c7b98cea05d8f (diff)
downloadqpid-python-4a5cad728b45af81f64d21901968380142d7d111.tar.gz
Copy over QPID-925
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655326 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java39
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java7
2 files changed, 30 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index 55d636696d..cb3aa5259a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -24,6 +24,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
+import java.util.ArrayList;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
@@ -37,7 +38,7 @@ public class TxAck implements TxnOp
{
private final UnacknowledgedMessageMap _map;
private final Map<Long, QueueEntry> _unacked = new HashMap<Long,QueueEntry>();
- private final List<Long> _individual = new LinkedList<Long>();
+ private List<Long> _individual;
private long _deliveryTag;
private boolean _multiple;
@@ -50,7 +51,10 @@ public class TxAck implements TxnOp
{
if (!multiple)
{
-
+ if(_individual == null)
+ {
+ _individual = new ArrayList<Long>();
+ }
//have acked a single message that is not part of
//the previously acked region so record
//individually
@@ -67,26 +71,33 @@ public class TxAck implements TxnOp
public void consolidate()
{
- //lookup all the unacked messages that have been acked in this transaction
- if (_multiple)
+ if(_unacked.isEmpty())
{
- //get all the unacked messages for the accumulated
- //multiple acks
- _map.collect(_deliveryTag, true, _unacked);
- }
- //get any unacked messages for individual acks outside the
- //range covered by multiple acks
- for (long tag : _individual)
- {
- if(_deliveryTag < tag)
+ //lookup all the unacked messages that have been acked in this transaction
+ if (_multiple)
+ {
+ //get all the unacked messages for the accumulated
+ //multiple acks
+ _map.collect(_deliveryTag, true, _unacked);
+ }
+ if(_individual != null)
{
- _map.collect(tag, false, _unacked);
+ //get any unacked messages for individual acks outside the
+ //range covered by multiple acks
+ for (long tag : _individual)
+ {
+ if(_deliveryTag < tag)
+ {
+ _map.collect(tag, false, _unacked);
+ }
+ }
}
}
}
public boolean checkPersistent() throws AMQException
{
+ consolidate();
//if any of the messages in unacked are persistent the txn
//buffer must be marked as persistent:
for (QueueEntry msg : _unacked.values())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 4234820480..ad8303ec5d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -176,8 +176,7 @@ public class LocalTransactionalContext implements TransactionalContext
// as new acks come in. If this is the first ack in the txn
// we will need to create and enlist the op.
if (_ackOp == null)
- {
- beginTranIfNecessary();
+ {
_ackOp = new TxAck(unacknowledgedMessageMap);
_txnBuffer.enlist(_ackOp);
}
@@ -192,6 +191,10 @@ public class LocalTransactionalContext implements TransactionalContext
{
_ackOp.update(deliveryTag, multiple);
}
+ if(!_inTran && _ackOp.checkPersistent())
+ {
+ beginTranIfNecessary();
+ }
}
public void messageFullyReceived(boolean persistent) throws AMQException