diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn')
5 files changed, 97 insertions, 31 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index efd7850a49..43e60c8e13 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -66,11 +66,18 @@ public class AsyncAutoCommitTransaction implements ServerTransaction _futureRecorder = recorder; } + @Override public long getTransactionStartTime() { return 0L; } + @Override + public long getTransactionUpdateTime() + { + return 0L; + } + /** * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered * by the caller are executed immediately. @@ -241,7 +248,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) { Transaction txn = null; try diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index e5a7df6880..8a9479a2d4 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -52,11 +52,18 @@ public class AutoCommitTransaction implements ServerTransaction _messageStore = transactionLog; } + @Override public long getTransactionStartTime() { return 0L; } + @Override + public long getTransactionUpdateTime() + { + return 0L; + } + /** * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered * by the caller are executed immediately. @@ -178,7 +185,7 @@ public class AutoCommitTransaction implements ServerTransaction } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) { Transaction txn = null; try @@ -270,4 +277,6 @@ public class AutoCommitTransaction implements ServerTransaction } } + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java index 05d0110e9b..ab987f0fb9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java @@ -26,7 +26,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Xid; @@ -39,10 +38,6 @@ public class DistributedTransaction implements ServerTransaction private final AutoCommitTransaction _autoCommitTransaction; - private volatile Transaction _transaction; - - private long _txnStartTime = 0L; - private DtxBranch _branch; private AMQSessionModel _session; private VirtualHost _vhost; @@ -55,9 +50,16 @@ public class DistributedTransaction implements ServerTransaction _autoCommitTransaction = new AutoCommitTransaction(vhost.getMessageStore()); } + @Override public long getTransactionStartTime() { - return _txnStartTime; + return 0; + } + + @Override + public long getTransactionUpdateTime() + { + return 0; } public void addPostTransactionAction(Action postTransactionAction) @@ -107,7 +109,7 @@ public class DistributedTransaction implements ServerTransaction { _branch.enqueue(queue, message); _branch.addPostTransactionAcion(postTransactionAction); - enqueue(Collections.singletonList(queue), message, postTransactionAction, System.currentTimeMillis()); + enqueue(Collections.singletonList(queue), message, postTransactionAction); } else { @@ -116,7 +118,7 @@ public class DistributedTransaction implements ServerTransaction } public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, - Action postTransactionAction, long currentTime) + Action postTransactionAction) { if(_branch != null) { @@ -128,7 +130,7 @@ public class DistributedTransaction implements ServerTransaction } else { - _autoCommitTransaction.enqueue(queues, message, postTransactionAction, currentTime); + _autoCommitTransaction.enqueue(queues, message, postTransactionAction); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 3fbcff7e2c..afa7cb0fb4 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -49,25 +49,42 @@ public class LocalTransaction implements ServerTransaction private final List<Action> _postTransactionActions = new ArrayList<Action>(); private volatile Transaction _transaction; - private MessageStore _transactionLog; - private long _txnStartTime = 0L; + private final ActivityTimeAccessor _activityTime; + private final MessageStore _transactionLog; + private volatile long _txnStartTime = 0L; + private volatile long _txnUpdateTime = 0l; private StoreFuture _asyncTran; public LocalTransaction(MessageStore transactionLog) { - _transactionLog = transactionLog; + this(transactionLog, new ActivityTimeAccessor() + { + @Override + public long getActivityTime() + { + return System.currentTimeMillis(); + } + }); } - - public boolean inTransaction() + + public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime) { - return _transaction != null; + _transactionLog = transactionLog; + _activityTime = activityTime; } + @Override public long getTransactionStartTime() { return _txnStartTime; } + @Override + public long getTransactionUpdateTime() + { + return _txnUpdateTime; + } + public void addPostTransactionAction(Action postTransactionAction) { sync(); @@ -78,6 +95,7 @@ public class LocalTransaction implements ServerTransaction { sync(); _postTransactionActions.add(postTransactionAction); + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); if(message.isPersistent() && queue.isDurable()) { @@ -104,6 +122,7 @@ public class LocalTransaction implements ServerTransaction { sync(); _postTransactionActions.add(postTransactionAction); + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); try { @@ -180,6 +199,7 @@ public class LocalTransaction implements ServerTransaction { sync(); _postTransactionActions.add(postTransactionAction); + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); if(message.isPersistent() && queue.isDurable()) { @@ -189,7 +209,7 @@ public class LocalTransaction implements ServerTransaction { _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); } - + beginTranIfNecessary(); _transaction.enqueueMessage(queue, message); } @@ -202,15 +222,11 @@ public class LocalTransaction implements ServerTransaction } } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) { sync(); _postTransactionActions.add(postTransactionAction); - - if (_txnStartTime == 0L) - { - _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime; - } + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); if(message.isPersistent()) { @@ -224,8 +240,7 @@ public class LocalTransaction implements ServerTransaction { _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() ); } - - + beginTranIfNecessary(); _transaction.enqueueMessage(queue, message); } @@ -378,16 +393,24 @@ public class LocalTransaction implements ServerTransaction } throw new RuntimeException("Failed to commit transaction", e); } - - } private void doPostTransactionActions() { + if(_logger.isDebugEnabled()) + { + _logger.debug("Beginning " + _postTransactionActions.size() + " post transaction actions"); + } + for(int i = 0; i < _postTransactionActions.size(); i++) { _postTransactionActions.get(i).postCommit(); } + + if(_logger.isDebugEnabled()) + { + _logger.debug("Completed post transaction actions"); + } } public void rollback() @@ -427,16 +450,34 @@ public class LocalTransaction implements ServerTransaction } } + private void initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime() + { + long currentTime = _activityTime.getActivityTime(); + + if (_txnStartTime == 0) + { + _txnStartTime = currentTime; + } + _txnUpdateTime = currentTime; + } + private void resetDetails() { _asyncTran = null; _transaction = null; - _postTransactionActions.clear(); + _postTransactionActions.clear(); _txnStartTime = 0L; + _txnUpdateTime = 0; } public boolean isTransactional() { return true; } + + public interface ActivityTimeAccessor + { + long getActivityTime(); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index c568ae67aa..8acac00479 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -55,11 +55,18 @@ public interface ServerTransaction /** * Return the time the current transaction started. - * + * * @return the time this transaction started or 0 if not in a transaction */ long getTransactionStartTime(); + /** + * Return the time of the last activity on the current transaction. + * + * @return the time of the last activity or 0 if not in a transaction + */ + long getTransactionUpdateTime(); + /** * Register an Action for execution after transaction commit or rollback. Actions * will be executed in the order in which they are registered. @@ -92,7 +99,7 @@ public interface ServerTransaction * * Store operations will result only for a persistent messages on durable queues. */ - void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime); + void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction); /** * Commit the transaction represented by this object. |