diff options
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 31 |
1 files changed, 12 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 9cc7d16ff3..54e76c94d7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -981,7 +981,7 @@ public class AMQChannel { final boolean immediate = _incommingMessage.isImmediate(); - ServerTransaction txn = null; + for(AMQQueue queue : _destinationQueues) { @@ -991,31 +991,24 @@ public class AMQChannel if(immediate && !entry.getDeliveredToConsumer() && entry.acquire()) { - if(txn == null) - { - txn = new LocalTransaction(_messageStore); - Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); - entries.add(entry); - txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries)); - } - + ServerTransaction txn = new LocalTransaction(_messageStore); + Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); + entries.add(entry); + txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries)); + txn.commit(); AMQMessage message = (AMQMessage) entry.getMessage(); - _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), - message.getContentHeaderBody(), - message, - _channelId, - AMQConstant.NO_CONSUMERS.getCode(), - new AMQShortString("Immediate delivery is not possible.")); + _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), + message.getContentHeaderBody(), + message, + _channelId, + AMQConstant.NO_CONSUMERS.getCode(), + new AMQShortString("Immediate delivery is not possible.")); } } - if(txn != null) - { - txn.commit(); - } } catch (AMQException e) { |