summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java')
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java143
1 files changed, 83 insertions, 60 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 7c9276dbdc..a04c743be1 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -21,21 +21,29 @@ package org.apache.qpid.server.txn;
*/
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.BaseQueue;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.AMQException;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Collection;
+/**
+ * A concrete implementation of ServerTransaction where enqueue/dequeue
+ * operations share a single long-lived transaction.
+ *
+ * The caller is responsible for invoking commit() (or rollback()) as necessary.
+ */
public class LocalTransaction implements ServerTransaction
{
- private final List<Action> _postCommitActions = new ArrayList<Action>();
+ protected static final Logger _logger = Logger.getLogger(LocalTransaction.class);
+
+ private final List<Action> _postTransactionActions = new ArrayList<Action>();
private volatile TransactionLog.Transaction _transaction;
private TransactionLog _transactionLog;
@@ -45,17 +53,23 @@ public class LocalTransaction implements ServerTransaction
_transactionLog = transactionLog;
}
- public void addPostCommitAction(Action postCommitAction)
+ public void addPostTransactionAction(Action postTransactionAction)
{
- _postCommitActions.add(postCommitAction);
+ _postTransactionActions.add(postTransactionAction);
}
- public void dequeue(BaseQueue queue, EnqueableMessage message, Action postCommitAction)
+ public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
+ _postTransactionActions.add(postTransactionAction);
+
if(message.isPersistent() && queue.isDurable())
{
try
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ }
beginTranIfNecessary();
_transaction.dequeueMessage(queue, message.getMessageNumber());
@@ -63,23 +77,31 @@ public class LocalTransaction implements ServerTransaction
}
catch(AMQException e)
{
+ _logger.error("Error during message dequeues", e);
tidyUpOnError(e);
}
}
- _postCommitActions.add(postCommitAction);
}
- public void dequeue(Collection<QueueEntry> queueEntries, Action postCommitAction)
+ public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
+ _postTransactionActions.add(postTransactionAction);
+
try
{
for(QueueEntry entry : queueEntries)
{
ServerMessage message = entry.getMessage();
- AMQQueue queue = entry.getQueue();
+ BaseQueue queue = entry.getQueue();
+
if(message.isPersistent() && queue.isDurable())
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ }
+
beginTranIfNecessary();
_transaction.dequeueMessage(queue, message.getMessageNumber());
}
@@ -88,9 +110,9 @@ public class LocalTransaction implements ServerTransaction
}
catch(AMQException e)
{
+ _logger.error("Error during message dequeues", e);
tidyUpOnError(e);
}
- _postCommitActions.add(postCommitAction);
}
@@ -98,7 +120,7 @@ public class LocalTransaction implements ServerTransaction
{
try
{
- for(Action action : _postCommitActions)
+ for(Action action : _postTransactionActions)
{
action.onRollback();
}
@@ -107,14 +129,20 @@ public class LocalTransaction implements ServerTransaction
{
try
{
- _transaction.abortTran();
+ if (_transaction != null)
+ {
+ _transaction.abortTran();
+ }
}
- catch (Exception e1)
+ catch (Exception abortException)
{
- // TODO could try to chain the information to the original error
+ _logger.error("Abort transaction failed while trying to handle previous error", abortException);
+ }
+ finally
+ {
+ _transaction = null;
+ _postTransactionActions.clear();
}
- _transaction = null;
- _postCommitActions.clear();
}
throw new RuntimeException(e);
@@ -122,6 +150,7 @@ public class LocalTransaction implements ServerTransaction
private void beginTranIfNecessary()
{
+
if(_transaction == null)
{
try
@@ -135,52 +164,50 @@ public class LocalTransaction implements ServerTransaction
}
}
- public void enqueue(BaseQueue queue, EnqueableMessage message, Action postCommitAction)
+ public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
+ _postTransactionActions.add(postTransactionAction);
+
if(message.isPersistent() && queue.isDurable())
{
- beginTranIfNecessary();
try
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ }
+
+ beginTranIfNecessary();
_transaction.enqueueMessage(queue, message.getMessageNumber());
}
catch (Exception e)
{
+ _logger.error("Error during message enqueue", e);
+
tidyUpOnError(e);
}
}
- _postCommitActions.add(postCommitAction);
-
-
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postCommitAction)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
{
-
+ _postTransactionActions.add(postTransactionAction);
if(message.isPersistent())
{
- if(_transaction == null)
- {
- for(BaseQueue queue : queues)
- {
- if(queue.isDurable())
- {
- beginTranIfNecessary();
- break;
- }
- }
-
-
- }
-
-
try
{
for(BaseQueue queue : queues)
{
if(queue.isDurable())
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() );
+ }
+
+
+ beginTranIfNecessary();
_transaction.enqueueMessage(queue, message.getMessageNumber());
}
}
@@ -188,12 +215,11 @@ public class LocalTransaction implements ServerTransaction
}
catch (Exception e)
{
+ _logger.error("Error during message enqueue", e);
+
tidyUpOnError(e);
}
}
- _postCommitActions.add(postCommitAction);
-
-
}
public void commit()
@@ -202,55 +228,52 @@ public class LocalTransaction implements ServerTransaction
{
if(_transaction != null)
{
-
_transaction.commitTran();
}
- for(Action action : _postCommitActions)
+ for(Action action : _postTransactionActions)
{
action.postCommit();
}
}
catch (Exception e)
{
- for(Action action : _postCommitActions)
+ _logger.error("Failed to commit transaction", e);
+
+ for(Action action : _postTransactionActions)
{
action.onRollback();
}
- //TODO
- throw new RuntimeException(e);
+ throw new RuntimeException("Failed to commit transaction", e);
}
finally
{
_transaction = null;
- _postCommitActions.clear();
+ _postTransactionActions.clear();
}
}
public void rollback()
{
-
try
{
if(_transaction != null)
{
-
_transaction.abortTran();
}
}
catch (AMQException e)
{
- //TODO
- e.printStackTrace();
- throw new RuntimeException(e);
+ _logger.error("Failed to rollback transaction", e);
+ throw new RuntimeException("Failed to rollback transaction", e);
}
finally
{
try
{
- for(Action action : _postCommitActions)
+ for(Action action : _postTransactionActions)
{
action.onRollback();
}
@@ -258,7 +281,7 @@ public class LocalTransaction implements ServerTransaction
finally
{
_transaction = null;
- _postCommitActions.clear();
+ _postTransactionActions.clear();
}
}
}