summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/txn
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java9
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java20
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java77
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java11
5 files changed, 97 insertions, 31 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index efd7850a49..43e60c8e13 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -66,11 +66,18 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
_futureRecorder = recorder;
}
+ @Override
public long getTransactionStartTime()
{
return 0L;
}
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return 0L;
+ }
+
/**
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
@@ -241,7 +248,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index e5a7df6880..8a9479a2d4 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -52,11 +52,18 @@ public class AutoCommitTransaction implements ServerTransaction
_messageStore = transactionLog;
}
+ @Override
public long getTransactionStartTime()
{
return 0L;
}
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return 0L;
+ }
+
/**
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
@@ -178,7 +185,7 @@ public class AutoCommitTransaction implements ServerTransaction
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
@@ -270,4 +277,6 @@ public class AutoCommitTransaction implements ServerTransaction
}
}
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
index 05d0110e9b..ab987f0fb9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
@@ -26,7 +26,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
@@ -39,10 +38,6 @@ public class DistributedTransaction implements ServerTransaction
private final AutoCommitTransaction _autoCommitTransaction;
- private volatile Transaction _transaction;
-
- private long _txnStartTime = 0L;
-
private DtxBranch _branch;
private AMQSessionModel _session;
private VirtualHost _vhost;
@@ -55,9 +50,16 @@ public class DistributedTransaction implements ServerTransaction
_autoCommitTransaction = new AutoCommitTransaction(vhost.getMessageStore());
}
+ @Override
public long getTransactionStartTime()
{
- return _txnStartTime;
+ return 0;
+ }
+
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return 0;
}
public void addPostTransactionAction(Action postTransactionAction)
@@ -107,7 +109,7 @@ public class DistributedTransaction implements ServerTransaction
{
_branch.enqueue(queue, message);
_branch.addPostTransactionAcion(postTransactionAction);
- enqueue(Collections.singletonList(queue), message, postTransactionAction, System.currentTimeMillis());
+ enqueue(Collections.singletonList(queue), message, postTransactionAction);
}
else
{
@@ -116,7 +118,7 @@ public class DistributedTransaction implements ServerTransaction
}
public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message,
- Action postTransactionAction, long currentTime)
+ Action postTransactionAction)
{
if(_branch != null)
{
@@ -128,7 +130,7 @@ public class DistributedTransaction implements ServerTransaction
}
else
{
- _autoCommitTransaction.enqueue(queues, message, postTransactionAction, currentTime);
+ _autoCommitTransaction.enqueue(queues, message, postTransactionAction);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 3fbcff7e2c..afa7cb0fb4 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -49,25 +49,42 @@ public class LocalTransaction implements ServerTransaction
private final List<Action> _postTransactionActions = new ArrayList<Action>();
private volatile Transaction _transaction;
- private MessageStore _transactionLog;
- private long _txnStartTime = 0L;
+ private final ActivityTimeAccessor _activityTime;
+ private final MessageStore _transactionLog;
+ private volatile long _txnStartTime = 0L;
+ private volatile long _txnUpdateTime = 0l;
private StoreFuture _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
- _transactionLog = transactionLog;
+ this(transactionLog, new ActivityTimeAccessor()
+ {
+ @Override
+ public long getActivityTime()
+ {
+ return System.currentTimeMillis();
+ }
+ });
}
-
- public boolean inTransaction()
+
+ public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime)
{
- return _transaction != null;
+ _transactionLog = transactionLog;
+ _activityTime = activityTime;
}
+ @Override
public long getTransactionStartTime()
{
return _txnStartTime;
}
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return _txnUpdateTime;
+ }
+
public void addPostTransactionAction(Action postTransactionAction)
{
sync();
@@ -78,6 +95,7 @@ public class LocalTransaction implements ServerTransaction
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent() && queue.isDurable())
{
@@ -104,6 +122,7 @@ public class LocalTransaction implements ServerTransaction
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
try
{
@@ -180,6 +199,7 @@ public class LocalTransaction implements ServerTransaction
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent() && queue.isDurable())
{
@@ -189,7 +209,7 @@ public class LocalTransaction implements ServerTransaction
{
_logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
}
-
+
beginTranIfNecessary();
_transaction.enqueueMessage(queue, message);
}
@@ -202,15 +222,11 @@ public class LocalTransaction implements ServerTransaction
}
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
-
- if (_txnStartTime == 0L)
- {
- _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime;
- }
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent())
{
@@ -224,8 +240,7 @@ public class LocalTransaction implements ServerTransaction
{
_logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() );
}
-
-
+
beginTranIfNecessary();
_transaction.enqueueMessage(queue, message);
}
@@ -378,16 +393,24 @@ public class LocalTransaction implements ServerTransaction
}
throw new RuntimeException("Failed to commit transaction", e);
}
-
-
}
private void doPostTransactionActions()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Beginning " + _postTransactionActions.size() + " post transaction actions");
+ }
+
for(int i = 0; i < _postTransactionActions.size(); i++)
{
_postTransactionActions.get(i).postCommit();
}
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Completed post transaction actions");
+ }
}
public void rollback()
@@ -427,16 +450,34 @@ public class LocalTransaction implements ServerTransaction
}
}
+ private void initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime()
+ {
+ long currentTime = _activityTime.getActivityTime();
+
+ if (_txnStartTime == 0)
+ {
+ _txnStartTime = currentTime;
+ }
+ _txnUpdateTime = currentTime;
+ }
+
private void resetDetails()
{
_asyncTran = null;
_transaction = null;
- _postTransactionActions.clear();
+ _postTransactionActions.clear();
_txnStartTime = 0L;
+ _txnUpdateTime = 0;
}
public boolean isTransactional()
{
return true;
}
+
+ public interface ActivityTimeAccessor
+ {
+ long getActivityTime();
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
index c568ae67aa..8acac00479 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
@@ -55,11 +55,18 @@ public interface ServerTransaction
/**
* Return the time the current transaction started.
- *
+ *
* @return the time this transaction started or 0 if not in a transaction
*/
long getTransactionStartTime();
+ /**
+ * Return the time of the last activity on the current transaction.
+ *
+ * @return the time of the last activity or 0 if not in a transaction
+ */
+ long getTransactionUpdateTime();
+
/**
* Register an Action for execution after transaction commit or rollback. Actions
* will be executed in the order in which they are registered.
@@ -92,7 +99,7 @@ public interface ServerTransaction
*
* Store operations will result only for a persistent messages on durable queues.
*/
- void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime);
+ void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction);
/**
* Commit the transaction represented by this object.