diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 18:57:16 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 18:57:16 +0000 |
commit | f05bf34b207e12b38cfa3011cfe00cff5ffc47a4 (patch) | |
tree | 0f0559045390dcb82e2b22dc259f504cb44b5319 | |
parent | 5da2a7c946faa4a2d3cf9358f7e4508f1f16f99f (diff) | |
download | qpid-python-f05bf34b207e12b38cfa3011cfe00cff5ffc47a4.tar.gz |
Fixed immediate messages to dequeue
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829628 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 42 |
1 files changed, 34 insertions, 8 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 54e76c94d7..35da132833 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -55,6 +55,7 @@ import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.logging.actors.AMQPChannelActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.output.ProtocolOutputConverter; public class AMQChannel { @@ -119,6 +120,8 @@ public class AMQChannel private static final Runnable NULL_TASK = new Runnable() { public void run() {} }; private List<QueueEntry> _resendList = new ArrayList<QueueEntry>(); + private static final + AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException @@ -991,19 +994,42 @@ public class AMQChannel if(immediate && !entry.getDeliveredToConsumer() && entry.acquire()) { + + ServerTransaction txn = new LocalTransaction(_messageStore); Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); entries.add(entry); - txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries)); + final AMQMessage message = (AMQMessage) entry.getMessage(); + txn.dequeue(queue, entry.getMessage(), + new MessageAcknowledgeAction(entries) + { + @Override + public void postCommit() + { + try + { + final + ProtocolOutputConverter outputConverter = + _session.getProtocolOutputConverter(); + + outputConverter.writeReturn(message.getMessagePublishInfo(), + message.getContentHeaderBody(), + message, + _channelId, + AMQConstant.NO_CONSUMERS.getCode(), + IMMEDIATE_DELIVERY_REPLY_TEXT); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + super.postCommit(); + } + } + ); txn.commit(); - AMQMessage message = (AMQMessage) entry.getMessage(); - _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), - message.getContentHeaderBody(), - message, - _channelId, - AMQConstant.NO_CONSUMERS.getCode(), - new AMQShortString("Immediate delivery is not possible.")); + } |