summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java69
1 files changed, 47 insertions, 22 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index a53e305e49..e6e3a9cadb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -69,7 +69,7 @@ public class SubscriptionImpl implements Subscription
private boolean _closed = false;
private AMQQueue _queue;
- private final AtomicBoolean _resending = new AtomicBoolean(false);
+ private final AtomicBoolean _sendLock = new AtomicBoolean(false);
public static class Factory implements SubscriptionFactory
{
@@ -193,7 +193,18 @@ public class SubscriptionImpl implements Subscription
public String toString()
{
- return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
+ String subscriber = "[channel=" + channel +
+ ", consumerTag=" + consumerTag +
+ ", session=" + protocolSession.getKey() +
+ ", resendQueue=" + (_resendQueue != null);
+
+ if (_resendQueue != null)
+ {
+ subscriber += ", resendSize=" + _resendQueue.size();
+ }
+
+
+ return subscriber + "]";
}
/**
@@ -239,7 +250,7 @@ public class SubscriptionImpl implements Subscription
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
//fixme what is wrong with this?
@@ -275,7 +286,7 @@ public class SubscriptionImpl implements Subscription
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
//fixme what is wrong with this?
@@ -292,7 +303,18 @@ public class SubscriptionImpl implements Subscription
public boolean isSuspended()
{
- return channel.isSuspended() && !_resending.get();
+ if (_logger.isTraceEnabled())
+ {
+ if (channel.isSuspended())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended");
+ }
+ if (_sendLock.get())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing.");
+ }
+ }
+ return channel.isSuspended() || _sendLock.get();
}
/**
@@ -386,7 +408,20 @@ public class SubscriptionImpl implements Subscription
public void close()
{
- _logger.info("Closing subscription:" + this);
+ synchronized (_sendLock)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting SendLock true");
+ }
+
+ _sendLock.set(true);
+
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this);
+ }
if (_resendQueue != null && !_resendQueue.isEmpty())
{
@@ -411,17 +446,17 @@ public class SubscriptionImpl implements Subscription
));
_closed = true;
}
+
}
private void requeue()
{
-
if (_queue != null)
{
- _logger.trace("Requeuing :" + _resendQueue.size() + " messages");
-
- //Take control over to this thread for delivering messages from the Async Delivery.
- setResending(true);
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Requeuing :" + _resendQueue.size() + " messages");
+ }
while (!_resendQueue.isEmpty())
{
@@ -441,8 +476,6 @@ public class SubscriptionImpl implements Subscription
}
}
- setResending(false);
-
if (!_resendQueue.isEmpty())
{
_logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null.");
@@ -462,14 +495,6 @@ public class SubscriptionImpl implements Subscription
_resendQueue = null;
}
- private void setResending(boolean resending)
- {
- synchronized (_resending)
- {
- _resending.set(resending);
- }
- }
-
public boolean isBrowser()
{
return _isBrowser;
@@ -528,7 +553,7 @@ public class SubscriptionImpl implements Subscription
public Object sendlock()
{
- return _resending;
+ return _sendLock;
}
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange, boolean redelivered)