diff options
author | Martin Ritchie <ritchiem@apache.org> | 2008-03-14 11:36:42 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2008-03-14 11:36:42 +0000 |
commit | 9db506849ee57d0669f5df47d8a84c18e20dfb1d (patch) | |
tree | 24c801e94ccfd31a2fc01236c0e26fbbf1809ee2 | |
parent | c57e80ac3fe600ee2969b00c1d81bfd2cfe0cbd0 (diff) | |
download | qpid-python-9db506849ee57d0669f5df47d8a84c18e20dfb1d.tar.gz |
QPID-852 : Updated broker so that it closes consumers when there are no messages on the queue.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637066 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 31 insertions, 9 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 4a0121700c..7c6db0b4b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -700,6 +700,8 @@ public class AMQQueue implements Managable, Comparable { _subscribers.setExclusive(true); } + + subscription.start(); } private boolean isExclusive() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index a706098b71..96ce6743ec 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -45,8 +45,6 @@ public interface Subscription void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst); - boolean isAutoClose(); - void close(); boolean isClosed(); @@ -60,4 +58,6 @@ public interface Subscription Object getSendLock(); AMQChannel getChannel(); + + void start(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 6e68b5637e..bde3ad8ec9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -461,7 +461,7 @@ public class SubscriptionImpl implements Subscription } } - public boolean isAutoClose() + private boolean isAutoClose() { return _autoClose; } @@ -523,19 +523,24 @@ public class SubscriptionImpl implements Subscription { _logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this); - ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag); - _sentClose = true; - - //fixme JIRA do this better + boolean unregisteredOK = false; try { - channel.unsubscribeConsumer(protocolSession, consumerTag); + unregisteredOK = channel.unsubscribeConsumer(protocolSession, consumerTag); } catch (AMQException e) { // Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag. + _logger.info("Unable to UnsubscribeConsumer :" + consumerTag +" so not going to send CancelOK."); } + + if (unregisteredOK) + { + ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag); + _sentClose = true; + } + } } @@ -666,4 +671,19 @@ public class SubscriptionImpl implements Subscription return channel; } + public void start() + { + //Check to see if we need to autoclose + if (filtersMessages()) + { + if (isAutoClose()) + { + if (_messages.isEmpty()) + { + autoclose(); + } + } + } + } + } |