diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java')
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java | 143 |
1 files changed, 83 insertions, 60 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 7c9276dbdc..a04c743be1 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -21,21 +21,29 @@ package org.apache.qpid.server.txn; */ -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.BaseQueue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.AMQException; - -import java.util.List; -import java.util.ArrayList; -import java.util.Collection; +/** + * A concrete implementation of ServerTransaction where enqueue/dequeue + * operations share a single long-lived transaction. + * + * The caller is responsible for invoking commit() (or rollback()) as necessary. + */ public class LocalTransaction implements ServerTransaction { - private final List<Action> _postCommitActions = new ArrayList<Action>(); + protected static final Logger _logger = Logger.getLogger(LocalTransaction.class); + + private final List<Action> _postTransactionActions = new ArrayList<Action>(); private volatile TransactionLog.Transaction _transaction; private TransactionLog _transactionLog; @@ -45,17 +53,23 @@ public class LocalTransaction implements ServerTransaction _transactionLog = transactionLog; } - public void addPostCommitAction(Action postCommitAction) + public void addPostTransactionAction(Action postTransactionAction) { - _postCommitActions.add(postCommitAction); + _postTransactionActions.add(postTransactionAction); } - public void dequeue(BaseQueue queue, EnqueableMessage message, Action postCommitAction) + public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { + _postTransactionActions.add(postTransactionAction); + if(message.isPersistent() && queue.isDurable()) { try { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); + } beginTranIfNecessary(); _transaction.dequeueMessage(queue, message.getMessageNumber()); @@ -63,23 +77,31 @@ public class LocalTransaction implements ServerTransaction } catch(AMQException e) { + _logger.error("Error during message dequeues", e); tidyUpOnError(e); } } - _postCommitActions.add(postCommitAction); } - public void dequeue(Collection<QueueEntry> queueEntries, Action postCommitAction) + public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) { + _postTransactionActions.add(postTransactionAction); + try { for(QueueEntry entry : queueEntries) { ServerMessage message = entry.getMessage(); - AMQQueue queue = entry.getQueue(); + BaseQueue queue = entry.getQueue(); + if(message.isPersistent() && queue.isDurable()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); + } + beginTranIfNecessary(); _transaction.dequeueMessage(queue, message.getMessageNumber()); } @@ -88,9 +110,9 @@ public class LocalTransaction implements ServerTransaction } catch(AMQException e) { + _logger.error("Error during message dequeues", e); tidyUpOnError(e); } - _postCommitActions.add(postCommitAction); } @@ -98,7 +120,7 @@ public class LocalTransaction implements ServerTransaction { try { - for(Action action : _postCommitActions) + for(Action action : _postTransactionActions) { action.onRollback(); } @@ -107,14 +129,20 @@ public class LocalTransaction implements ServerTransaction { try { - _transaction.abortTran(); + if (_transaction != null) + { + _transaction.abortTran(); + } } - catch (Exception e1) + catch (Exception abortException) { - // TODO could try to chain the information to the original error + _logger.error("Abort transaction failed while trying to handle previous error", abortException); + } + finally + { + _transaction = null; + _postTransactionActions.clear(); } - _transaction = null; - _postCommitActions.clear(); } throw new RuntimeException(e); @@ -122,6 +150,7 @@ public class LocalTransaction implements ServerTransaction private void beginTranIfNecessary() { + if(_transaction == null) { try @@ -135,52 +164,50 @@ public class LocalTransaction implements ServerTransaction } } - public void enqueue(BaseQueue queue, EnqueableMessage message, Action postCommitAction) + public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { + _postTransactionActions.add(postTransactionAction); + if(message.isPersistent() && queue.isDurable()) { - beginTranIfNecessary(); try { + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); + } + + beginTranIfNecessary(); _transaction.enqueueMessage(queue, message.getMessageNumber()); } catch (Exception e) { + _logger.error("Error during message enqueue", e); + tidyUpOnError(e); } } - _postCommitActions.add(postCommitAction); - - } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postCommitAction) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) { - + _postTransactionActions.add(postTransactionAction); if(message.isPersistent()) { - if(_transaction == null) - { - for(BaseQueue queue : queues) - { - if(queue.isDurable()) - { - beginTranIfNecessary(); - break; - } - } - - - } - - try { for(BaseQueue queue : queues) { if(queue.isDurable()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() ); + } + + + beginTranIfNecessary(); _transaction.enqueueMessage(queue, message.getMessageNumber()); } } @@ -188,12 +215,11 @@ public class LocalTransaction implements ServerTransaction } catch (Exception e) { + _logger.error("Error during message enqueue", e); + tidyUpOnError(e); } } - _postCommitActions.add(postCommitAction); - - } public void commit() @@ -202,55 +228,52 @@ public class LocalTransaction implements ServerTransaction { if(_transaction != null) { - _transaction.commitTran(); } - for(Action action : _postCommitActions) + for(Action action : _postTransactionActions) { action.postCommit(); } } catch (Exception e) { - for(Action action : _postCommitActions) + _logger.error("Failed to commit transaction", e); + + for(Action action : _postTransactionActions) { action.onRollback(); } - //TODO - throw new RuntimeException(e); + throw new RuntimeException("Failed to commit transaction", e); } finally { _transaction = null; - _postCommitActions.clear(); + _postTransactionActions.clear(); } } public void rollback() { - try { if(_transaction != null) { - _transaction.abortTran(); } } catch (AMQException e) { - //TODO - e.printStackTrace(); - throw new RuntimeException(e); + _logger.error("Failed to rollback transaction", e); + throw new RuntimeException("Failed to rollback transaction", e); } finally { try { - for(Action action : _postCommitActions) + for(Action action : _postTransactionActions) { action.onRollback(); } @@ -258,7 +281,7 @@ public class LocalTransaction implements ServerTransaction finally { _transaction = null; - _postCommitActions.clear(); + _postTransactionActions.clear(); } } } |