diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java')
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java | 139 |
1 files changed, 132 insertions, 7 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index 78fad3f629..4afaa39a0d 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -21,26 +21,151 @@ package org.apache.qpid.server.txn; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.AMQException; + +import java.util.List; +import java.util.Collection; public class AutoCommitTransaction implements Transaction { + private final TransactionLog _transactionLog; - public void dequeue(AMQQueue queue, ServerMessage message, Action postCommitAction) + public AutoCommitTransaction(TransactionLog transactionLog) { - // store.remove enqueue - // store.commit - postCommitAction.postCommit(); + _transactionLog = transactionLog; } - public void enqueue(AMQQueue queue, ServerMessage message, Action postCommitAction) + + public void addPostCommitAction(Action postCommitAction) { - // store.add enqueue - // store.commit postCommitAction.postCommit(); } + public void dequeue(AMQQueue queue, EnqueableMessage message, Action postCommitAction) + { + + try + { + if(message.isPersistent() && queue.isDurable()) + { + + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + _transactionLog.dequeueMessage(context, queue, message.getMessageNumber()); + // store.remove enqueue + // store.commit + _transactionLog.commitTran(context); + } + postCommitAction.postCommit(); + } + catch (AMQException e) + { + //TODO + postCommitAction.onRollback(); + throw new RuntimeException(e); + } + } + + public void dequeue(Collection<QueueEntry> ackedMessages, Action postCommitAction) + { + try + { + StoreContext context = null; + for(QueueEntry entry : ackedMessages) + { + ServerMessage message = entry.getMessage(); + AMQQueue queue = entry.getQueue(); + + if(message.isPersistent() && queue.isDurable()) + { + if(context == null) + { + context = new StoreContext(); + _transactionLog.beginTran(context); + } + _transactionLog.dequeueMessage(context, queue, message.getMessageNumber()); + } + + } + if(context != null) + { + _transactionLog.commitTran(context); + } + postCommitAction.postCommit(); + } + catch (AMQException e) + { + //TODO + postCommitAction.onRollback(); + throw new RuntimeException(e); + } + } + + + public void enqueue(AMQQueue queue, EnqueableMessage message, Action postCommitAction) + { + try + { + if(message.isPersistent() && queue.isDurable()) + { + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + _transactionLog.enqueueMessage(context, queue, message.getMessageNumber()); + _transactionLog.commitTran(context); + } + postCommitAction.postCommit(); + } + catch (AMQException e) + { + //TODO + e.printStackTrace(); + postCommitAction.onRollback(); + throw new RuntimeException(e); + } + + } + + public void enqueue(List<AMQQueue> queues, EnqueableMessage message, Action postCommitAction) + { + try + { + + if(message.isPersistent()) + { + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + + Long id = message.getMessageNumber(); + for(AMQQueue q : queues) + { + if(q.isDurable()) + { + _transactionLog.enqueueMessage(context, q, id); + } + } + _transactionLog.commitTran(context); + + } + postCommitAction.postCommit(); + } + catch (AMQException e) + { + //TODO + postCommitAction.onRollback(); + throw new RuntimeException(e); + } + + } + public void commit() { |