summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-25 18:50:44 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-25 18:50:44 +0000
commit5da2a7c946faa4a2d3cf9358f7e4508f1f16f99f (patch)
tree164c84fa550f0dc85d748e989fa3afa0dcc02c7e
parent2fd7f7ca379d87633881d989a12abfd80ed84250 (diff)
downloadqpid-python-5da2a7c946faa4a2d3cf9358f7e4508f1f16f99f.tar.gz
Fixed immediate messages to dequeue
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829626 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java31
1 files changed, 12 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 9cc7d16ff3..54e76c94d7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -981,7 +981,7 @@ public class AMQChannel
{
final boolean immediate = _incommingMessage.isImmediate();
- ServerTransaction txn = null;
+
for(AMQQueue queue : _destinationQueues)
{
@@ -991,31 +991,24 @@ public class AMQChannel
if(immediate && !entry.getDeliveredToConsumer() && entry.acquire())
{
- if(txn == null)
- {
- txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
- txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries));
- }
-
+ ServerTransaction txn = new LocalTransaction(_messageStore);
+ Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
+ entries.add(entry);
+ txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries));
+ txn.commit();
AMQMessage message = (AMQMessage) entry.getMessage();
- _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- new AMQShortString("Immediate delivery is not possible."));
+ _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ new AMQShortString("Immediate delivery is not possible."));
}
}
- if(txn != null)
- {
- txn.commit();
- }
}
catch (AMQException e)
{