summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-03-14 11:36:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-03-14 11:36:42 +0000
commit9db506849ee57d0669f5df47d8a84c18e20dfb1d (patch)
tree24c801e94ccfd31a2fc01236c0e26fbbf1809ee2
parentc57e80ac3fe600ee2969b00c1d81bfd2cfe0cbd0 (diff)
downloadqpid-python-9db506849ee57d0669f5df47d8a84c18e20dfb1d.tar.gz
QPID-852 : Updated broker so that it closes consumers when there are no messages on the queue.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637066 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java34
3 files changed, 31 insertions, 9 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 4a0121700c..7c6db0b4b3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -700,6 +700,8 @@ public class AMQQueue implements Managable, Comparable
{
_subscribers.setExclusive(true);
}
+
+ subscription.start();
}
private boolean isExclusive()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index a706098b71..96ce6743ec 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -45,8 +45,6 @@ public interface Subscription
void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst);
- boolean isAutoClose();
-
void close();
boolean isClosed();
@@ -60,4 +58,6 @@ public interface Subscription
Object getSendLock();
AMQChannel getChannel();
+
+ void start();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 6e68b5637e..bde3ad8ec9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -461,7 +461,7 @@ public class SubscriptionImpl implements Subscription
}
}
- public boolean isAutoClose()
+ private boolean isAutoClose()
{
return _autoClose;
}
@@ -523,19 +523,24 @@ public class SubscriptionImpl implements Subscription
{
_logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this);
- ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
- _sentClose = true;
-
- //fixme JIRA do this better
+ boolean unregisteredOK = false;
try
{
- channel.unsubscribeConsumer(protocolSession, consumerTag);
+ unregisteredOK = channel.unsubscribeConsumer(protocolSession, consumerTag);
}
catch (AMQException e)
{
// Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag.
+ _logger.info("Unable to UnsubscribeConsumer :" + consumerTag +" so not going to send CancelOK.");
}
+
+ if (unregisteredOK)
+ {
+ ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
+ _sentClose = true;
+ }
+
}
}
@@ -666,4 +671,19 @@ public class SubscriptionImpl implements Subscription
return channel;
}
+ public void start()
+ {
+ //Check to see if we need to autoclose
+ if (filtersMessages())
+ {
+ if (isAutoClose())
+ {
+ if (_messages.isEmpty())
+ {
+ autoclose();
+ }
+ }
+ }
+ }
+
}