diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java | 69 |
1 files changed, 47 insertions, 22 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index a53e305e49..e6e3a9cadb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -69,7 +69,7 @@ public class SubscriptionImpl implements Subscription private boolean _closed = false; private AMQQueue _queue; - private final AtomicBoolean _resending = new AtomicBoolean(false); + private final AtomicBoolean _sendLock = new AtomicBoolean(false); public static class Factory implements SubscriptionFactory { @@ -193,7 +193,18 @@ public class SubscriptionImpl implements Subscription public String toString() { - return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]"; + String subscriber = "[channel=" + channel + + ", consumerTag=" + consumerTag + + ", session=" + protocolSession.getKey() + + ", resendQueue=" + (_resendQueue != null); + + if (_resendQueue != null) + { + subscriber += ", resendSize=" + _resendQueue.size(); + } + + + return subscriber + "]"; } /** @@ -239,7 +250,7 @@ public class SubscriptionImpl implements Subscription { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered()); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); //fixme what is wrong with this? @@ -275,7 +286,7 @@ public class SubscriptionImpl implements Subscription channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered()); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); //fixme what is wrong with this? @@ -292,7 +303,18 @@ public class SubscriptionImpl implements Subscription public boolean isSuspended() { - return channel.isSuspended() && !_resending.get(); + if (_logger.isTraceEnabled()) + { + if (channel.isSuspended()) + { + _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended"); + } + if (_sendLock.get()) + { + _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing."); + } + } + return channel.isSuspended() || _sendLock.get(); } /** @@ -386,7 +408,20 @@ public class SubscriptionImpl implements Subscription public void close() { - _logger.info("Closing subscription:" + this); + synchronized (_sendLock) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting SendLock true"); + } + + _sendLock.set(true); + + } + if (_logger.isInfoEnabled()) + { + _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this); + } if (_resendQueue != null && !_resendQueue.isEmpty()) { @@ -411,17 +446,17 @@ public class SubscriptionImpl implements Subscription )); _closed = true; } + } private void requeue() { - if (_queue != null) { - _logger.trace("Requeuing :" + _resendQueue.size() + " messages"); - - //Take control over to this thread for delivering messages from the Async Delivery. - setResending(true); + if (_logger.isTraceEnabled()) + { + _logger.trace("Requeuing :" + _resendQueue.size() + " messages"); + } while (!_resendQueue.isEmpty()) { @@ -441,8 +476,6 @@ public class SubscriptionImpl implements Subscription } } - setResending(false); - if (!_resendQueue.isEmpty()) { _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null."); @@ -462,14 +495,6 @@ public class SubscriptionImpl implements Subscription _resendQueue = null; } - private void setResending(boolean resending) - { - synchronized (_resending) - { - _resending.set(resending); - } - } - public boolean isBrowser() { return _isBrowser; @@ -528,7 +553,7 @@ public class SubscriptionImpl implements Subscription public Object sendlock() { - return _resending; + return _sendLock; } private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange, boolean redelivered) |