summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-09-27 11:39:06 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-09-27 11:39:06 +0000
commitf1745fde1a2164878794f0bcf62fa62aacbe5cae (patch)
treed5e2cfd2e3c434a301e7e89b62f412ff5a587f65
parent62255135a3cf5ebd85bc465e77fff1715cdebbdc (diff)
downloadqpid-python-f1745fde1a2164878794f0bcf62fa62aacbe5cae.tar.gz
Moved delivery tag retrieval and addition to Unack'd list back to SubscriptionImpl as the synchronized block must include writing the frame to the wire.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450401 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/org/apache/qpid/server/AMQChannel.java12
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java24
2 files changed, 16 insertions, 20 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java
index d4226c42aa..7cd9277c89 100644
--- a/java/broker/src/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java
@@ -474,18 +474,6 @@ public class AMQChannel implements Managable
}
}
- public synchronized long prepareNewMessageForDelivery(boolean acks, AMQMessage msg, String consumerTag, AMQQueue queue)
- {
- long deliveryTag = getNextDeliveryTag();
-
- if (acks)
- {
- addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- }
-
- return deliveryTag;
- }
-
/**
* Acknowledge one or more messages.
*
diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
index ef18f61070..3a00a486ae 100644
--- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -53,13 +53,13 @@ public class SubscriptionImpl implements Subscription
public static class Factory implements SubscriptionFactory
{
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
- throws AMQException
+ throws AMQException
{
return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
- throws AMQException
+ throws AMQException
{
return new SubscriptionImpl(channel, protocolSession, consumerTag);
}
@@ -67,7 +67,7 @@ public class SubscriptionImpl implements Subscription
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
String consumerTag, boolean acks)
- throws AMQException
+ throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
@@ -84,7 +84,7 @@ public class SubscriptionImpl implements Subscription
public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
String consumerTag)
- throws AMQException
+ throws AMQException
{
this(channel, protocolSession, consumerTag, false);
}
@@ -126,13 +126,20 @@ public class SubscriptionImpl implements Subscription
{
if (msg != null)
{
- long deliveryTag = channel.prepareNewMessageForDelivery(_acks,msg,consumerTag,queue);
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
- protocolSession.writeFrame(frame);
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+ protocolSession.writeFrame(frame);
+ }
// if we do not need to wait for client acknowledgements we can decrement
// the reference count immediately
if (!_acks)
@@ -160,6 +167,7 @@ public class SubscriptionImpl implements Subscription
/**
* Callback indicating that a queue has been deleted.
+ *
* @param queue
*/
public void queueDeleted(AMQQueue queue)