summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-12-16 08:55:22 +0000
committerKeith Wall <kwall@apache.org>2014-12-16 08:55:22 +0000
commit77dbff8daed044b9c95ef0c210b5fe69ff9bbc2c (patch)
treeabc7a6f98746d78e896842852da4847f937f598f /qpid/java
parent96f9c3e225682940e18bd5fa6a9a270a80de034c (diff)
downloadqpid-python-77dbff8daed044b9c95ef0c210b5fe69ff9bbc2c.tar.gz
QPID-6272: [Java Broker] Null reference to AMQChannel#defaultQueue once the queue is deleted
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1645880 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java50
1 files changed, 36 insertions, 14 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 2550221949..5461b47eb4 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -124,6 +124,8 @@ public class AMQChannel
public static final int DEFAULT_PREFETCH = 4096;
private static final Logger _logger = Logger.getLogger(AMQChannel.class);
+ private final DefaultQueueAssociationClearingTask
+ _defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask();
//TODO use Broker property to configure message authorization requirements
private boolean _messageAuthorizationRequired = Boolean.getBoolean(BrokerProperties.PROPERTY_MSG_AUTH);
@@ -140,7 +142,7 @@ public class AMQChannel
private long _deliveryTag = 0;
/** A channel has a default queue (the last declared) that is used when no queue name is explicitly set */
- private AMQQueue _defaultQueue;
+ private volatile AMQQueue _defaultQueue;
/** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
private int _consumerTag;
@@ -181,11 +183,9 @@ public class AMQChannel
private LogSubject _logSubject;
private volatile boolean _rollingBack;
- private static final Runnable NULL_TASK = new Runnable() { public void run() {} };
private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
- private long _createTime = System.currentTimeMillis();
private final ClientDeliveryMethod _clientDeliveryMethod;
@@ -1289,17 +1289,6 @@ public class AMQChannel
return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]";
}
- public void setDefaultQueue(AMQQueue queue)
- {
- _defaultQueue = queue;
- }
-
- public AMQQueue getDefaultQueue()
- {
- return _defaultQueue;
- }
-
-
public boolean isClosing()
{
return _closing.get();
@@ -3585,4 +3574,37 @@ public class AMQChannel
return exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName);
}
+ private void setDefaultQueue(AMQQueue queue)
+ {
+ AMQQueue currentDefaultQueue = _defaultQueue;
+ if (queue != currentDefaultQueue)
+ {
+ if (currentDefaultQueue != null)
+ {
+ currentDefaultQueue.removeDeleteTask(_defaultQueueAssociationClearingTask);
+ }
+ if (queue != null)
+ {
+ queue.addDeleteTask(_defaultQueueAssociationClearingTask);
+ }
+ }
+ _defaultQueue = queue;
+ }
+
+ private AMQQueue getDefaultQueue()
+ {
+ return _defaultQueue;
+ }
+
+ private class DefaultQueueAssociationClearingTask implements Action<AMQQueue>
+ {
+ @Override
+ public void performAction(final AMQQueue queue)
+ {
+ if ( queue == _defaultQueue)
+ {
+ _defaultQueue = null;
+ }
+ }
+ }
}