diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java | 14 |
1 files changed, 9 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 06522740cc..ff6abdd58b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; import java.util.List; @@ -45,12 +46,15 @@ public class LocalTransactionalContext implements TransactionalContext private final MessageStore _messageStore; + private final StoreContext _storeContext; + private boolean _inTran = false; - public LocalTransactionalContext(MessageStore messageStore, + public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext, TxnBuffer txnBuffer, List<RequiredDeliveryException> returnMessages) { _messageStore = messageStore; + _storeContext = storeContext; _txnBuffer = txnBuffer; _returnMessages = returnMessages; _txnBuffer.enlist(new StoreMessageOperation(messageStore)); @@ -58,7 +62,7 @@ public class LocalTransactionalContext implements TransactionalContext public void rollback() throws AMQException { - _txnBuffer.rollback(); + _txnBuffer.rollback(_storeContext); } public void deliver(AMQMessage message, AMQQueue queue) throws AMQException @@ -125,7 +129,7 @@ public class LocalTransactionalContext implements TransactionalContext { if (!_inTran) { - _messageStore.beginTran(); + _messageStore.beginTran(_storeContext); _inTran = true; } } @@ -134,11 +138,11 @@ public class LocalTransactionalContext implements TransactionalContext { if (_ackOp != null) { - _ackOp.consolidate(); + _ackOp.consolidate(); //already enlisted, after commit will reset regardless of outcome _ackOp = null; } - _txnBuffer.commit(); + _txnBuffer.commit(_storeContext); } } |