diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java')
-rwxr-xr-x | java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java | 309 |
1 files changed, 0 insertions, 309 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java deleted file mode 100755 index 946dbd7c28..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ /dev/null @@ -1,309 +0,0 @@ -package org.apache.qpid.server.txn; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TransactionLog; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A concrete implementation of ServerTransaction where enqueue/dequeue - * operations share a single long-lived transaction. - * - * The caller is responsible for invoking commit() (or rollback()) as necessary. - */ -public class LocalTransaction implements ServerTransaction -{ - protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class); - - private final List<Action> _postTransactionActions = new ArrayList<Action>(); - - private volatile TransactionLog.Transaction _transaction; - private TransactionLog _transactionLog; - private long _txnStartTime = 0L; - - public LocalTransaction(TransactionLog transactionLog) - { - _transactionLog = transactionLog; - } - - public boolean inTransaction() - { - return _transaction != null; - } - - public long getTransactionStartTime() - { - return _txnStartTime; - } - - public void addPostTransactionAction(Action postTransactionAction) - { - _postTransactionActions.add(postTransactionAction); - } - - public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) - { - _postTransactionActions.add(postTransactionAction); - - if(message.isPersistent() && queue.isDurable()) - { - try - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); - } - - beginTranIfNecessary(); - _transaction.dequeueMessage(queue, message.getMessageNumber()); - - } - catch(AMQException e) - { - _logger.error("Error during message dequeues", e); - tidyUpOnError(e); - } - } - } - - public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) - { - _postTransactionActions.add(postTransactionAction); - - try - { - for(QueueEntry entry : queueEntries) - { - ServerMessage message = entry.getMessage(); - BaseQueue queue = entry.getQueue(); - - if(message.isPersistent() && queue.isDurable()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); - } - - beginTranIfNecessary(); - _transaction.dequeueMessage(queue, message.getMessageNumber()); - } - - } - } - catch(AMQException e) - { - _logger.error("Error during message dequeues", e); - tidyUpOnError(e); - } - } - - private void tidyUpOnError(Exception e) - { - try - { - for(Action action : _postTransactionActions) - { - action.onRollback(); - } - } - finally - { - try - { - if (_transaction != null) - { - _transaction.abortTran(); - } - } - catch (Exception abortException) - { - _logger.error("Abort transaction failed while trying to handle previous error", abortException); - } - finally - { - resetDetails(); - } - } - - throw new RuntimeException(e); - } - - private void beginTranIfNecessary() - { - - if(_transaction == null) - { - try - { - _transaction = _transactionLog.newTransaction(); - } - catch (Exception e) - { - tidyUpOnError(e); - } - } - } - - public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) - { - _postTransactionActions.add(postTransactionAction); - - if(message.isPersistent() && queue.isDurable()) - { - try - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); - } - - beginTranIfNecessary(); - _transaction.enqueueMessage(queue, message.getMessageNumber()); - } - catch (Exception e) - { - _logger.error("Error during message enqueue", e); - - tidyUpOnError(e); - } - } - } - - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) - { - _postTransactionActions.add(postTransactionAction); - - if (_txnStartTime == 0L) - { - _txnStartTime = System.currentTimeMillis(); - } - - if(message.isPersistent()) - { - try - { - for(BaseQueue queue : queues) - { - if(queue.isDurable()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() ); - } - - - beginTranIfNecessary(); - _transaction.enqueueMessage(queue, message.getMessageNumber()); - } - } - - } - catch (Exception e) - { - _logger.error("Error during message enqueue", e); - - tidyUpOnError(e); - } - } - } - - public void commit() - { - try - { - if(_transaction != null) - { - _transaction.commitTran(); - } - - for(Action action : _postTransactionActions) - { - action.postCommit(); - } - } - catch (Exception e) - { - _logger.error("Failed to commit transaction", e); - - for(Action action : _postTransactionActions) - { - action.onRollback(); - } - throw new RuntimeException("Failed to commit transaction", e); - } - finally - { - resetDetails(); - } - } - - public void rollback() - { - try - { - if(_transaction != null) - { - _transaction.abortTran(); - } - } - catch (AMQException e) - { - _logger.error("Failed to rollback transaction", e); - throw new RuntimeException("Failed to rollback transaction", e); - } - finally - { - try - { - for(Action action : _postTransactionActions) - { - action.onRollback(); - } - } - finally - { - resetDetails(); - } - } - } - - private void resetDetails() - { - _transaction = null; - _postTransactionActions.clear(); - _txnStartTime = 0L; - } -} |