diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-09-27 11:39:06 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-09-27 11:39:06 +0000 |
commit | f1745fde1a2164878794f0bcf62fa62aacbe5cae (patch) | |
tree | d5e2cfd2e3c434a301e7e89b62f412ff5a587f65 | |
parent | 62255135a3cf5ebd85bc465e77fff1715cdebbdc (diff) | |
download | qpid-python-f1745fde1a2164878794f0bcf62fa62aacbe5cae.tar.gz |
Moved delivery tag retrieval and addition to Unack'd list back to SubscriptionImpl as the synchronized block must include writing the frame to the wire.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450401 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/org/apache/qpid/server/AMQChannel.java | 12 | ||||
-rw-r--r-- | java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java | 24 |
2 files changed, 16 insertions, 20 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java index d4226c42aa..7cd9277c89 100644 --- a/java/broker/src/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java @@ -474,18 +474,6 @@ public class AMQChannel implements Managable } } - public synchronized long prepareNewMessageForDelivery(boolean acks, AMQMessage msg, String consumerTag, AMQQueue queue) - { - long deliveryTag = getNextDeliveryTag(); - - if (acks) - { - addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - } - - return deliveryTag; - } - /** * Acknowledge one or more messages. * diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java index ef18f61070..3a00a486ae 100644 --- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -53,13 +53,13 @@ public class SubscriptionImpl implements Subscription public static class Factory implements SubscriptionFactory { public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) - throws AMQException + throws AMQException { return new SubscriptionImpl(channel, protocolSession, consumerTag, acks); } public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) - throws AMQException + throws AMQException { return new SubscriptionImpl(channel, protocolSession, consumerTag); } @@ -67,7 +67,7 @@ public class SubscriptionImpl implements Subscription public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, String consumerTag, boolean acks) - throws AMQException + throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) @@ -84,7 +84,7 @@ public class SubscriptionImpl implements Subscription public SubscriptionImpl(int channel, AMQProtocolSession protocolSession, String consumerTag) - throws AMQException + throws AMQException { this(channel, protocolSession, consumerTag, false); } @@ -126,13 +126,20 @@ public class SubscriptionImpl implements Subscription { if (msg != null) { - long deliveryTag = channel.prepareNewMessageForDelivery(_acks,msg,consumerTag,queue); + synchronized(channel) + { + long deliveryTag = channel.getNextDeliveryTag(); - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); - AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); + if (_acks) + { + channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } - protocolSession.writeFrame(frame); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); + protocolSession.writeFrame(frame); + } // if we do not need to wait for client acknowledgements we can decrement // the reference count immediately if (!_acks) @@ -160,6 +167,7 @@ public class SubscriptionImpl implements Subscription /** * Callback indicating that a queue has been deleted. + * * @param queue */ public void queueDeleted(AMQQueue queue) |