diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 26 |
1 files changed, 15 insertions, 11 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3b17da5af7..fe5da20fa5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -200,16 +200,22 @@ public class AMQChannel implements SessionConfig private void incrementOutstandingTxnsIfNecessary() { - //There can currently only be at most one outstanding transaction - //due to only having LocalTransaction support. Set value to 1 if 0. - _txnCount.compareAndSet(0,1); + if(isTransactional()) + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 1 if 0. + _txnCount.compareAndSet(0,1); + } } private void decrementOutstandingTxnsIfNecessary() { - //There can currently only be at most one outstanding transaction - //due to only having LocalTransaction support. Set value to 0 if 1. - _txnCount.compareAndSet(1,0); + if(isTransactional()) + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 0 if 1. + _txnCount.compareAndSet(1,0); + } } public Long getTxnStarts() @@ -313,7 +319,7 @@ public class AMQChannel implements SessionConfig } else { - _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage,isTransactional())); + _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage)); } } @@ -1031,7 +1037,7 @@ public class AMQChannel implements SessionConfig } - private AMQMessage createAMQMessage(IncomingMessage incomingMessage, boolean transactional) + private AMQMessage createAMQMessage(IncomingMessage incomingMessage) throws AMQException { @@ -1055,7 +1061,6 @@ public class AMQChannel implements SessionConfig private class MessageDeliveryAction implements ServerTransaction.Action { - private boolean _transactional; private IncomingMessage _incommingMessage; private ArrayList<? extends BaseQueue> _destinationQueues; @@ -1063,7 +1068,6 @@ public class AMQChannel implements SessionConfig ArrayList<? extends BaseQueue> destinationQueues, boolean transactional) { - _transactional = transactional; _incommingMessage = currentMessage; _destinationQueues = destinationQueues; } @@ -1074,7 +1078,7 @@ public class AMQChannel implements SessionConfig { final boolean immediate = _incommingMessage.isImmediate(); - final AMQMessage amqMessage = createAMQMessage(_incommingMessage, _transactional); + final AMQMessage amqMessage = createAMQMessage(_incommingMessage); MessageReference ref = amqMessage.newReference(); for(final BaseQueue queue : _destinationQueues) |