diff options
author | Keith Wall <kwall@apache.org> | 2014-12-16 08:55:22 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-12-16 08:55:22 +0000 |
commit | 77dbff8daed044b9c95ef0c210b5fe69ff9bbc2c (patch) | |
tree | abc7a6f98746d78e896842852da4847f937f598f /qpid/java | |
parent | 96f9c3e225682940e18bd5fa6a9a270a80de034c (diff) | |
download | qpid-python-77dbff8daed044b9c95ef0c210b5fe69ff9bbc2c.tar.gz |
QPID-6272: [Java Broker] Null reference to AMQChannel#defaultQueue once the queue is deleted
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1645880 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java | 50 |
1 files changed, 36 insertions, 14 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 2550221949..5461b47eb4 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -124,6 +124,8 @@ public class AMQChannel public static final int DEFAULT_PREFETCH = 4096; private static final Logger _logger = Logger.getLogger(AMQChannel.class); + private final DefaultQueueAssociationClearingTask + _defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask(); //TODO use Broker property to configure message authorization requirements private boolean _messageAuthorizationRequired = Boolean.getBoolean(BrokerProperties.PROPERTY_MSG_AUTH); @@ -140,7 +142,7 @@ public class AMQChannel private long _deliveryTag = 0; /** A channel has a default queue (the last declared) that is used when no queue name is explicitly set */ - private AMQQueue _defaultQueue; + private volatile AMQQueue _defaultQueue; /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ private int _consumerTag; @@ -181,11 +183,9 @@ public class AMQChannel private LogSubject _logSubject; private volatile boolean _rollingBack; - private static final Runnable NULL_TASK = new Runnable() { public void run() {} }; private List<MessageInstance> _resendList = new ArrayList<MessageInstance>(); private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); - private long _createTime = System.currentTimeMillis(); private final ClientDeliveryMethod _clientDeliveryMethod; @@ -1289,17 +1289,6 @@ public class AMQChannel return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]"; } - public void setDefaultQueue(AMQQueue queue) - { - _defaultQueue = queue; - } - - public AMQQueue getDefaultQueue() - { - return _defaultQueue; - } - - public boolean isClosing() { return _closing.get(); @@ -3585,4 +3574,37 @@ public class AMQChannel return exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName); } + private void setDefaultQueue(AMQQueue queue) + { + AMQQueue currentDefaultQueue = _defaultQueue; + if (queue != currentDefaultQueue) + { + if (currentDefaultQueue != null) + { + currentDefaultQueue.removeDeleteTask(_defaultQueueAssociationClearingTask); + } + if (queue != null) + { + queue.addDeleteTask(_defaultQueueAssociationClearingTask); + } + } + _defaultQueue = queue; + } + + private AMQQueue getDefaultQueue() + { + return _defaultQueue; + } + + private class DefaultQueueAssociationClearingTask implements Action<AMQQueue> + { + @Override + public void performAction(final AMQQueue queue) + { + if ( queue == _defaultQueue) + { + _defaultQueue = null; + } + } + } } |