diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 18:50:44 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 18:50:44 +0000 |
commit | 5da2a7c946faa4a2d3cf9358f7e4508f1f16f99f (patch) | |
tree | 164c84fa550f0dc85d748e989fa3afa0dcc02c7e | |
parent | 2fd7f7ca379d87633881d989a12abfd80ed84250 (diff) | |
download | qpid-python-5da2a7c946faa4a2d3cf9358f7e4508f1f16f99f.tar.gz |
Fixed immediate messages to dequeue
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829626 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 31 |
1 files changed, 12 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 9cc7d16ff3..54e76c94d7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -981,7 +981,7 @@ public class AMQChannel { final boolean immediate = _incommingMessage.isImmediate(); - ServerTransaction txn = null; + for(AMQQueue queue : _destinationQueues) { @@ -991,31 +991,24 @@ public class AMQChannel if(immediate && !entry.getDeliveredToConsumer() && entry.acquire()) { - if(txn == null) - { - txn = new LocalTransaction(_messageStore); - Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); - entries.add(entry); - txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries)); - } - + ServerTransaction txn = new LocalTransaction(_messageStore); + Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); + entries.add(entry); + txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries)); + txn.commit(); AMQMessage message = (AMQMessage) entry.getMessage(); - _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), - message.getContentHeaderBody(), - message, - _channelId, - AMQConstant.NO_CONSUMERS.getCode(), - new AMQShortString("Immediate delivery is not possible.")); + _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), + message.getContentHeaderBody(), + message, + _channelId, + AMQConstant.NO_CONSUMERS.getCode(), + new AMQShortString("Immediate delivery is not possible.")); } } - if(txn != null) - { - txn.commit(); - } } catch (AMQException e) { |