summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2011-03-08 00:11:30 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2011-03-08 00:11:30 +0000
commit49dc07c6d7ba26d6a250466e2679feb523e02dd0 (patch)
treeec1c15ab0b3b8d9733cf957cd825710928116062 /java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
parent449475309d147dcc64aba3fe31dc9432e45659f8 (diff)
downloadqpid-python-49dc07c6d7ba26d6a250466e2679feb523e02dd0.tar.gz
QPID-2985: Add producer configurable transaction timeouts
Port of QPID-2864 changes from 0.5.x-dev branch to trunk. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1079042 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java59
1 files changed, 46 insertions, 13 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index a04c743be1..f9dac782a6 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -20,18 +20,23 @@ package org.apache.qpid.server.txn;
*
*/
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -41,17 +46,28 @@ import org.apache.qpid.server.store.TransactionLog;
*/
public class LocalTransaction implements ServerTransaction
{
- protected static final Logger _logger = Logger.getLogger(LocalTransaction.class);
+ protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class);
private final List<Action> _postTransactionActions = new ArrayList<Action>();
private volatile TransactionLog.Transaction _transaction;
private TransactionLog _transactionLog;
+ private long _txnStartTime = 0L;
public LocalTransaction(TransactionLog transactionLog)
{
_transactionLog = transactionLog;
}
+
+ public boolean inTransaction()
+ {
+ return _transaction != null;
+ }
+
+ public long getTransactionStartTime()
+ {
+ return _txnStartTime;
+ }
public void addPostTransactionAction(Action postTransactionAction)
{
@@ -89,7 +105,6 @@ public class LocalTransaction implements ServerTransaction
try
{
-
for(QueueEntry entry : queueEntries)
{
ServerMessage message = entry.getMessage();
@@ -113,7 +128,6 @@ public class LocalTransaction implements ServerTransaction
_logger.error("Error during message dequeues", e);
tidyUpOnError(e);
}
-
}
private void tidyUpOnError(Exception e)
@@ -140,8 +154,7 @@ public class LocalTransaction implements ServerTransaction
}
finally
{
- _transaction = null;
- _postTransactionActions.clear();
+ resetDetails();
}
}
@@ -193,8 +206,25 @@ public class LocalTransaction implements ServerTransaction
{
_postTransactionActions.add(postTransactionAction);
+ if (_txnStartTime == 0L)
+ {
+ _txnStartTime = System.currentTimeMillis();
+ }
+
if(message.isPersistent())
{
+ if(_transaction == null)
+ {
+ for(BaseQueue queue : queues)
+ {
+ if(queue.isDurable())
+ {
+ beginTranIfNecessary();
+ break;
+ }
+ }
+ }
+
try
{
for(BaseQueue queue : queues)
@@ -248,17 +278,14 @@ public class LocalTransaction implements ServerTransaction
}
finally
{
- _transaction = null;
- _postTransactionActions.clear();
+ resetDetails();
}
-
}
public void rollback()
{
try
{
-
if(_transaction != null)
{
_transaction.abortTran();
@@ -280,9 +307,15 @@ public class LocalTransaction implements ServerTransaction
}
finally
{
- _transaction = null;
- _postTransactionActions.clear();
+ resetDetails();
}
}
}
+
+ private void resetDetails()
+ {
+ _transaction = null;
+ _postTransactionActions.clear();
+ _txnStartTime = 0L;
+ }
}