summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-25 18:57:16 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-25 18:57:16 +0000
commitf05bf34b207e12b38cfa3011cfe00cff5ffc47a4 (patch)
tree0f0559045390dcb82e2b22dc259f504cb44b5319
parent5da2a7c946faa4a2d3cf9358f7e4508f1f16f99f (diff)
downloadqpid-python-f05bf34b207e12b38cfa3011cfe00cff5ffc47a4.tar.gz
Fixed immediate messages to dequeue
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829628 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java42
1 files changed, 34 insertions, 8 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 54e76c94d7..35da132833 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -55,6 +55,7 @@ import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
public class AMQChannel
{
@@ -119,6 +120,8 @@ public class AMQChannel
private static final Runnable NULL_TASK = new Runnable() { public void run() {} };
private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
+ private static final
+ AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
@@ -991,19 +994,42 @@ public class AMQChannel
if(immediate && !entry.getDeliveredToConsumer() && entry.acquire())
{
+
+
ServerTransaction txn = new LocalTransaction(_messageStore);
Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
entries.add(entry);
- txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries));
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ txn.dequeue(queue, entry.getMessage(),
+ new MessageAcknowledgeAction(entries)
+ {
+ @Override
+ public void postCommit()
+ {
+ try
+ {
+ final
+ ProtocolOutputConverter outputConverter =
+ _session.getProtocolOutputConverter();
+
+ outputConverter.writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ IMMEDIATE_DELIVERY_REPLY_TEXT);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ super.postCommit();
+ }
+ }
+ );
txn.commit();
- AMQMessage message = (AMQMessage) entry.getMessage();
- _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- new AMQShortString("Immediate delivery is not possible."));
+
}