diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java')
-rwxr-xr-x | java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java | 77 |
1 files changed, 59 insertions, 18 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 3fbcff7e2c..afa7cb0fb4 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -49,25 +49,42 @@ public class LocalTransaction implements ServerTransaction private final List<Action> _postTransactionActions = new ArrayList<Action>(); private volatile Transaction _transaction; - private MessageStore _transactionLog; - private long _txnStartTime = 0L; + private final ActivityTimeAccessor _activityTime; + private final MessageStore _transactionLog; + private volatile long _txnStartTime = 0L; + private volatile long _txnUpdateTime = 0l; private StoreFuture _asyncTran; public LocalTransaction(MessageStore transactionLog) { - _transactionLog = transactionLog; + this(transactionLog, new ActivityTimeAccessor() + { + @Override + public long getActivityTime() + { + return System.currentTimeMillis(); + } + }); } - - public boolean inTransaction() + + public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime) { - return _transaction != null; + _transactionLog = transactionLog; + _activityTime = activityTime; } + @Override public long getTransactionStartTime() { return _txnStartTime; } + @Override + public long getTransactionUpdateTime() + { + return _txnUpdateTime; + } + public void addPostTransactionAction(Action postTransactionAction) { sync(); @@ -78,6 +95,7 @@ public class LocalTransaction implements ServerTransaction { sync(); _postTransactionActions.add(postTransactionAction); + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); if(message.isPersistent() && queue.isDurable()) { @@ -104,6 +122,7 @@ public class LocalTransaction implements ServerTransaction { sync(); _postTransactionActions.add(postTransactionAction); + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); try { @@ -180,6 +199,7 @@ public class LocalTransaction implements ServerTransaction { sync(); _postTransactionActions.add(postTransactionAction); + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); if(message.isPersistent() && queue.isDurable()) { @@ -189,7 +209,7 @@ public class LocalTransaction implements ServerTransaction { _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); } - + beginTranIfNecessary(); _transaction.enqueueMessage(queue, message); } @@ -202,15 +222,11 @@ public class LocalTransaction implements ServerTransaction } } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) { sync(); _postTransactionActions.add(postTransactionAction); - - if (_txnStartTime == 0L) - { - _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime; - } + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); if(message.isPersistent()) { @@ -224,8 +240,7 @@ public class LocalTransaction implements ServerTransaction { _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() ); } - - + beginTranIfNecessary(); _transaction.enqueueMessage(queue, message); } @@ -378,16 +393,24 @@ public class LocalTransaction implements ServerTransaction } throw new RuntimeException("Failed to commit transaction", e); } - - } private void doPostTransactionActions() { + if(_logger.isDebugEnabled()) + { + _logger.debug("Beginning " + _postTransactionActions.size() + " post transaction actions"); + } + for(int i = 0; i < _postTransactionActions.size(); i++) { _postTransactionActions.get(i).postCommit(); } + + if(_logger.isDebugEnabled()) + { + _logger.debug("Completed post transaction actions"); + } } public void rollback() @@ -427,16 +450,34 @@ public class LocalTransaction implements ServerTransaction } } + private void initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime() + { + long currentTime = _activityTime.getActivityTime(); + + if (_txnStartTime == 0) + { + _txnStartTime = currentTime; + } + _txnUpdateTime = currentTime; + } + private void resetDetails() { _asyncTran = null; _transaction = null; - _postTransactionActions.clear(); + _postTransactionActions.clear(); _txnStartTime = 0L; + _txnUpdateTime = 0; } public boolean isTransactional() { return true; } + + public interface ActivityTimeAccessor + { + long getActivityTime(); + } + } |