summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java77
1 files changed, 59 insertions, 18 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 3fbcff7e2c..afa7cb0fb4 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -49,25 +49,42 @@ public class LocalTransaction implements ServerTransaction
private final List<Action> _postTransactionActions = new ArrayList<Action>();
private volatile Transaction _transaction;
- private MessageStore _transactionLog;
- private long _txnStartTime = 0L;
+ private final ActivityTimeAccessor _activityTime;
+ private final MessageStore _transactionLog;
+ private volatile long _txnStartTime = 0L;
+ private volatile long _txnUpdateTime = 0l;
private StoreFuture _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
- _transactionLog = transactionLog;
+ this(transactionLog, new ActivityTimeAccessor()
+ {
+ @Override
+ public long getActivityTime()
+ {
+ return System.currentTimeMillis();
+ }
+ });
}
-
- public boolean inTransaction()
+
+ public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime)
{
- return _transaction != null;
+ _transactionLog = transactionLog;
+ _activityTime = activityTime;
}
+ @Override
public long getTransactionStartTime()
{
return _txnStartTime;
}
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return _txnUpdateTime;
+ }
+
public void addPostTransactionAction(Action postTransactionAction)
{
sync();
@@ -78,6 +95,7 @@ public class LocalTransaction implements ServerTransaction
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent() && queue.isDurable())
{
@@ -104,6 +122,7 @@ public class LocalTransaction implements ServerTransaction
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
try
{
@@ -180,6 +199,7 @@ public class LocalTransaction implements ServerTransaction
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent() && queue.isDurable())
{
@@ -189,7 +209,7 @@ public class LocalTransaction implements ServerTransaction
{
_logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
}
-
+
beginTranIfNecessary();
_transaction.enqueueMessage(queue, message);
}
@@ -202,15 +222,11 @@ public class LocalTransaction implements ServerTransaction
}
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
-
- if (_txnStartTime == 0L)
- {
- _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime;
- }
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent())
{
@@ -224,8 +240,7 @@ public class LocalTransaction implements ServerTransaction
{
_logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() );
}
-
-
+
beginTranIfNecessary();
_transaction.enqueueMessage(queue, message);
}
@@ -378,16 +393,24 @@ public class LocalTransaction implements ServerTransaction
}
throw new RuntimeException("Failed to commit transaction", e);
}
-
-
}
private void doPostTransactionActions()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Beginning " + _postTransactionActions.size() + " post transaction actions");
+ }
+
for(int i = 0; i < _postTransactionActions.size(); i++)
{
_postTransactionActions.get(i).postCommit();
}
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Completed post transaction actions");
+ }
}
public void rollback()
@@ -427,16 +450,34 @@ public class LocalTransaction implements ServerTransaction
}
}
+ private void initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime()
+ {
+ long currentTime = _activityTime.getActivityTime();
+
+ if (_txnStartTime == 0)
+ {
+ _txnStartTime = currentTime;
+ }
+ _txnUpdateTime = currentTime;
+ }
+
private void resetDetails()
{
_asyncTran = null;
_transaction = null;
- _postTransactionActions.clear();
+ _postTransactionActions.clear();
_txnStartTime = 0L;
+ _txnUpdateTime = 0;
}
public boolean isTransactional()
{
return true;
}
+
+ public interface ActivityTimeAccessor
+ {
+ long getActivityTime();
+ }
+
}