summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/org/apache/qpid/server/AMQChannel.java11
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQMessage.java2
-rw-r--r--java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java35
3 files changed, 39 insertions, 9 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java
index f009e52488..f863a3e794 100644
--- a/java/broker/src/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java
@@ -199,6 +199,12 @@ public class AMQChannel
{
if (_transactional)
{
+ //don't create a transaction unless needed
+ if(_currentMessage.isPersistent())
+ {
+ _txnBuffer.setPersistentMessageRecevied();
+ }
+
//don't route this until commit
_txnBuffer.enlist(new Publish(_currentMessage));
_currentMessage = null;
@@ -605,6 +611,11 @@ public class AMQChannel
_msg = msg;
}
+ public boolean isPersistent() throws AMQException
+ {
+ return _msg.isPersistent();
+ }
+
public void commit() throws AMQException
{
_exchanges.routeContent(_msg);
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
index a4ff453720..ade832ae5d 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
@@ -337,6 +337,8 @@ public class AMQMessage
{
throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
}
+
+ //todo remove literal values to a constant file such as AMQConstants in common
return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
&&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
}
diff --git a/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
index 9a46afafe5..e3897b3725 100644
--- a/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
+++ b/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
@@ -17,6 +17,7 @@
*/
package org.apache.qpid.server.txn;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.MessageStore;
@@ -25,21 +26,32 @@ import java.util.List;
public class TxnBuffer
{
+ private boolean _persistentMessageRecevied = false;
private final MessageStore _store;
private final List<TxnOp> _ops = new ArrayList<TxnOp>();
+ private static final Logger _log = Logger.getLogger(TxnBuffer.class);
public TxnBuffer(MessageStore store)
{
_store = store;
}
+ public void setPersistentMessageRecevied()
+ {
+ _persistentMessageRecevied = true;
+ }
+
public void commit() throws AMQException
{
- _store.beginTran();
+ if (_persistentMessageRecevied)
+ {
+ _log.info("Begin Transaction.");
+ _store.beginTran();
+ }
boolean failed = true;
try
{
- for(TxnOp op : _ops)
+ for (TxnOp op : _ops)
{
op.commit();
}
@@ -48,20 +60,25 @@ public class TxnBuffer
}
finally
{
- if(failed)
- {
- _store.abortTran();
- }
- else
+ if (_persistentMessageRecevied)
{
- _store.commitTran();
+ if (failed)
+ {
+ _log.info("Transaction Failed");
+ _store.abortTran();
+ }
+ else
+ {
+ _log.info("Transaction Succeeded");
+ _store.commitTran();
+ }
}
}
}
public void rollback() throws AMQException
{
- for(TxnOp op : _ops)
+ for (TxnOp op : _ops)
{
op.rollback();
}