diff options
Diffstat (limited to 'java')
3 files changed, 39 insertions, 9 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java index f009e52488..f863a3e794 100644 --- a/java/broker/src/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java @@ -199,6 +199,12 @@ public class AMQChannel { if (_transactional) { + //don't create a transaction unless needed + if(_currentMessage.isPersistent()) + { + _txnBuffer.setPersistentMessageRecevied(); + } + //don't route this until commit _txnBuffer.enlist(new Publish(_currentMessage)); _currentMessage = null; @@ -605,6 +611,11 @@ public class AMQChannel _msg = msg; } + public boolean isPersistent() throws AMQException + { + return _msg.isPersistent(); + } + public void commit() throws AMQException { _exchanges.routeContent(_msg); diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java index a4ff453720..ade832ae5d 100644 --- a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java @@ -337,6 +337,8 @@ public class AMQMessage { throw new AMQException("Cannot determine delivery mode of message. Content header not found."); } + + //todo remove literal values to a constant file such as AMQConstants in common return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; } diff --git a/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java index 9a46afafe5..e3897b3725 100644 --- a/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java +++ b/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java @@ -17,6 +17,7 @@ */ package org.apache.qpid.server.txn; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.MessageStore; @@ -25,21 +26,32 @@ import java.util.List; public class TxnBuffer { + private boolean _persistentMessageRecevied = false; private final MessageStore _store; private final List<TxnOp> _ops = new ArrayList<TxnOp>(); + private static final Logger _log = Logger.getLogger(TxnBuffer.class); public TxnBuffer(MessageStore store) { _store = store; } + public void setPersistentMessageRecevied() + { + _persistentMessageRecevied = true; + } + public void commit() throws AMQException { - _store.beginTran(); + if (_persistentMessageRecevied) + { + _log.info("Begin Transaction."); + _store.beginTran(); + } boolean failed = true; try { - for(TxnOp op : _ops) + for (TxnOp op : _ops) { op.commit(); } @@ -48,20 +60,25 @@ public class TxnBuffer } finally { - if(failed) - { - _store.abortTran(); - } - else + if (_persistentMessageRecevied) { - _store.commitTran(); + if (failed) + { + _log.info("Transaction Failed"); + _store.abortTran(); + } + else + { + _log.info("Transaction Succeeded"); + _store.commitTran(); + } } } } public void rollback() throws AMQException { - for(TxnOp op : _ops) + for (TxnOp op : _ops) { op.rollback(); } |