summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-02-13 00:30:34 +0000
committerRobert Gemmell <robbie@apache.org>2012-02-13 00:30:34 +0000
commitd15fc6f29484cbd05e95b584beeb6e1a59da1367 (patch)
tree387ab3b78951c497dc39e307a0a1fb6dc789e09a /java/broker/src/main/java/org
parent26e07c83dd4df84f61767de0c16d6732e8a6e30d (diff)
downloadqpid-python-d15fc6f29484cbd05e95b584beeb6e1a59da1367.tar.gz
QPID-3829: use a seperate object for reference checking to stop the AMQMessage holding its underlying 0-8/0-9/0-9-1 connection/io objects in memory after they are closed. Also stops an NPE on the 0-8/0-9/0-9-1 subscriptions when evaluating no-local after store recovery.
Enables NoLocalAfterRecoveryTest again, though updated to make it simpler and more reliable. This test should be removed if changes for QPID-3605 are undertaken. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1243379 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java15
5 files changed, 18 insertions, 31 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 2266de0ae4..e5e755bd23 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -1108,7 +1108,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage());
message.setExpiration(incomingMessage.getExpiration());
- message.setClientIdentifier(_session);
+ message.setConnectionIdentifier(_session.getReference());
return message;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index 583e7d177f..6a0e4d216e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -58,7 +58,7 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
private final long _size;
- private Object _sessionIdentifier;
+ private Object _connectionIdentifier;
private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
public AMQMessage(StoredMessage<MessageMetaData> handle)
@@ -218,19 +218,15 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
}
- public Object getPublisherIdentifier()
+ public Object getConnectionIdentifier()
{
- //todo store sessionIdentifier/client id with message in store
- //Currently the _sessionIdentifier will be null if the message has been
- // restored from a message Store
-
- return _sessionIdentifier;
+ return _connectionIdentifier;
}
- public void setClientIdentifier(final Object sessionIdentifier)
+ public void setConnectionIdentifier(final Object connectionIdentifier)
{
- _sessionIdentifier = sessionIdentifier;
+ _connectionIdentifier = connectionIdentifier;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 1f59091eba..f6980be525 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -132,7 +132,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private Subject _authorizedSubject;
private MethodDispatcher _dispatcher;
- private final long _sessionID;
+ private final long _connectionID;
+ private Object _reference = new Object();
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
@@ -170,7 +171,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_codecFactory = new AMQCodecFactory(true, this);
setNetworkConnection(network);
- _sessionID = connectionId;
+ _connectionID = connectionId;
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
@@ -203,7 +204,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public long getSessionID()
{
- return _sessionID;
+ return _connectionID;
}
public LogActor getLogActor()
@@ -969,11 +970,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return getMethodRegistry();
}
- public Object getClientIdentifier()
- {
- return _network.getRemoteAddress();
- }
-
public VirtualHost getVirtualHost()
{
return _virtualHost;
@@ -1464,4 +1460,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
+ public Object getReference()
+ {
+ return _reference;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index b68f6097e0..6cd5b21f89 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -172,7 +172,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
void setClientProperties(FieldTable clientProperties);
- Object getClientIdentifier();
+ Object getReference();
VirtualHost getVirtualHost();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 0f6bc976de..32baa17fc7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -475,10 +475,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean hasInterest(QueueEntry entry)
{
-
-
-
-
//check that the message hasn't been rejected
if (entry.isRejectedBy(getSubscriptionID()))
{
@@ -490,22 +486,17 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
if (_noLocal)
{
-
AMQMessage message = (AMQMessage) entry.getMessage();
- //todo - client id should be recorded so we don't have to handle
- // the case where this is null.
- final Object publisher = message.getPublisherIdentifier();
+ final Object publisherReference = message.getConnectionIdentifier();
// We don't want local messages so check to see if message is one we sent
- Object localInstance = getProtocolSession();
+ Object localReference = getProtocolSession().getReference();
- if(publisher.equals(localInstance))
+ if(publisherReference != null && publisherReference.equals(localReference))
{
return false;
}
-
-
}