diff options
author | Robert Gemmell <robbie@apache.org> | 2012-02-13 00:30:34 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-02-13 00:30:34 +0000 |
commit | d15fc6f29484cbd05e95b584beeb6e1a59da1367 (patch) | |
tree | 387ab3b78951c497dc39e307a0a1fb6dc789e09a /java/broker/src/main/java | |
parent | 26e07c83dd4df84f61767de0c16d6732e8a6e30d (diff) | |
download | qpid-python-d15fc6f29484cbd05e95b584beeb6e1a59da1367.tar.gz |
QPID-3829: use a seperate object for reference checking to stop the AMQMessage holding its underlying 0-8/0-9/0-9-1 connection/io objects in memory after they are closed. Also stops an NPE on the 0-8/0-9/0-9-1 subscriptions when evaluating no-local after store recovery.
Enables NoLocalAfterRecoveryTest again, though updated to make it simpler and more reliable. This test should be removed if changes for QPID-3605 are undertaken.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1243379 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java')
5 files changed, 18 insertions, 31 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 2266de0ae4..e5e755bd23 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -1108,7 +1108,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage()); message.setExpiration(incomingMessage.getExpiration()); - message.setClientIdentifier(_session); + message.setConnectionIdentifier(_session.getReference()); return message; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java index 583e7d177f..6a0e4d216e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java @@ -58,7 +58,7 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> private final long _size; - private Object _sessionIdentifier; + private Object _connectionIdentifier; private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); public AMQMessage(StoredMessage<MessageMetaData> handle) @@ -218,19 +218,15 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> } - public Object getPublisherIdentifier() + public Object getConnectionIdentifier() { - //todo store sessionIdentifier/client id with message in store - //Currently the _sessionIdentifier will be null if the message has been - // restored from a message Store - - return _sessionIdentifier; + return _connectionIdentifier; } - public void setClientIdentifier(final Object sessionIdentifier) + public void setConnectionIdentifier(final Object connectionIdentifier) { - _sessionIdentifier = sessionIdentifier; + _connectionIdentifier = connectionIdentifier; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 1f59091eba..f6980be525 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -132,7 +132,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private Subject _authorizedSubject; private MethodDispatcher _dispatcher; - private final long _sessionID; + private final long _connectionID; + private Object _reference = new Object(); private AMQPConnectionActor _actor; private LogSubject _logSubject; @@ -170,7 +171,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _codecFactory = new AMQCodecFactory(true, this); setNetworkConnection(network); - _sessionID = connectionId; + _connectionID = connectionId; _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); @@ -203,7 +204,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public long getSessionID() { - return _sessionID; + return _connectionID; } public LogActor getLogActor() @@ -969,11 +970,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return getMethodRegistry(); } - public Object getClientIdentifier() - { - return _network.getRemoteAddress(); - } - public VirtualHost getVirtualHost() { return _virtualHost; @@ -1464,4 +1460,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } + public Object getReference() + { + return _reference; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index b68f6097e0..6cd5b21f89 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -172,7 +172,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth void setClientProperties(FieldTable clientProperties); - Object getClientIdentifier(); + Object getReference(); VirtualHost getVirtualHost(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 0f6bc976de..32baa17fc7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -475,10 +475,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean hasInterest(QueueEntry entry) { - - - - //check that the message hasn't been rejected if (entry.isRejectedBy(getSubscriptionID())) { @@ -490,22 +486,17 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage if (_noLocal) { - AMQMessage message = (AMQMessage) entry.getMessage(); - //todo - client id should be recorded so we don't have to handle - // the case where this is null. - final Object publisher = message.getPublisherIdentifier(); + final Object publisherReference = message.getConnectionIdentifier(); // We don't want local messages so check to see if message is one we sent - Object localInstance = getProtocolSession(); + Object localReference = getProtocolSession().getReference(); - if(publisher.equals(localInstance)) + if(publisherReference != null && publisherReference.equals(localReference)) { return false; } - - } |