summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-02-13 00:30:34 +0000
committerRobert Gemmell <robbie@apache.org>2012-02-13 00:30:34 +0000
commitf3fb8a56864f89941fb527c8855018ce470143e1 (patch)
tree1dbb0ff9d370f847174c008b800d3b106e370c91
parenta6cc4cb2fba4dbc260b2a8b58512f7c7a24ba5be (diff)
downloadqpid-python-f3fb8a56864f89941fb527c8855018ce470143e1.tar.gz
QPID-3829: use a seperate object for reference checking to stop the AMQMessage holding its underlying 0-8/0-9/0-9-1 connection/io objects in memory after they are closed. Also stops an NPE on the 0-8/0-9/0-9-1 subscriptions when evaluating no-local after store recovery.
Enables NoLocalAfterRecoveryTest again, though updated to make it simpler and more reliable. This test should be removed if changes for QPID-3605 are undertaken. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1243379 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java15
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java171
-rw-r--r--qpid/java/test-profiles/JavaExcludes3
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes1
8 files changed, 49 insertions, 175 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 2266de0ae4..e5e755bd23 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -1108,7 +1108,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage());
message.setExpiration(incomingMessage.getExpiration());
- message.setClientIdentifier(_session);
+ message.setConnectionIdentifier(_session.getReference());
return message;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index 583e7d177f..6a0e4d216e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -58,7 +58,7 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
private final long _size;
- private Object _sessionIdentifier;
+ private Object _connectionIdentifier;
private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
public AMQMessage(StoredMessage<MessageMetaData> handle)
@@ -218,19 +218,15 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
}
- public Object getPublisherIdentifier()
+ public Object getConnectionIdentifier()
{
- //todo store sessionIdentifier/client id with message in store
- //Currently the _sessionIdentifier will be null if the message has been
- // restored from a message Store
-
- return _sessionIdentifier;
+ return _connectionIdentifier;
}
- public void setClientIdentifier(final Object sessionIdentifier)
+ public void setConnectionIdentifier(final Object connectionIdentifier)
{
- _sessionIdentifier = sessionIdentifier;
+ _connectionIdentifier = connectionIdentifier;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 1f59091eba..f6980be525 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -132,7 +132,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private Subject _authorizedSubject;
private MethodDispatcher _dispatcher;
- private final long _sessionID;
+ private final long _connectionID;
+ private Object _reference = new Object();
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
@@ -170,7 +171,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_codecFactory = new AMQCodecFactory(true, this);
setNetworkConnection(network);
- _sessionID = connectionId;
+ _connectionID = connectionId;
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
@@ -203,7 +204,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public long getSessionID()
{
- return _sessionID;
+ return _connectionID;
}
public LogActor getLogActor()
@@ -969,11 +970,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return getMethodRegistry();
}
- public Object getClientIdentifier()
- {
- return _network.getRemoteAddress();
- }
-
public VirtualHost getVirtualHost()
{
return _virtualHost;
@@ -1464,4 +1460,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
+ public Object getReference()
+ {
+ return _reference;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index b68f6097e0..6cd5b21f89 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -172,7 +172,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
void setClientProperties(FieldTable clientProperties);
- Object getClientIdentifier();
+ Object getReference();
VirtualHost getVirtualHost();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 0f6bc976de..32baa17fc7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -475,10 +475,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean hasInterest(QueueEntry entry)
{
-
-
-
-
//check that the message hasn't been rejected
if (entry.isRejectedBy(getSubscriptionID()))
{
@@ -490,22 +486,17 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
if (_noLocal)
{
-
AMQMessage message = (AMQMessage) entry.getMessage();
- //todo - client id should be recorded so we don't have to handle
- // the case where this is null.
- final Object publisher = message.getPublisherIdentifier();
+ final Object publisherReference = message.getConnectionIdentifier();
// We don't want local messages so check to see if message is one we sent
- Object localInstance = getProtocolSession();
+ Object localReference = getProtocolSession().getReference();
- if(publisher.equals(localInstance))
+ if(publisherReference != null && publisherReference.equals(localReference))
{
return false;
}
-
-
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
index a95d07ec45..2e259191aa 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
@@ -20,14 +20,8 @@
*/
package org.apache.qpid.server.persistent;
-import org.apache.commons.configuration.XMLConfiguration;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.server.store.DerbyMessageStore;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.ArrayList;
+import java.util.List;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -36,60 +30,28 @@ import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
/**
- * QPID-1813 : We do not store the client id with a message so on store restart
- * that information is lost and we are unable to perform no local checks.
- *
- * QPID-1813 highlights the lack of testing here as the broker will NPE as it
- * assumes that the client id of the publisher will always exist
+ * Verifies that after recovery, a new Connection with no-local in use is
+ * able to receive messages sent prior to the broker restart.
*/
-public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements ConnectionListener
+public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase
{
protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName();
protected static final int SEND_COUNT = 10;
- private CountDownLatch _failoverComplete = new CountDownLatch(1);
-
- protected ConnectionURL _connectionURL;
-
- @Override
- protected void setUp() throws Exception
- {
-
- XMLConfiguration configuration = new XMLConfiguration(_configFile);
- configuration.setProperty("virtualhosts.virtualhost.test.store.class", "org.apache.qpid.server.store.DerbyMessageStore");
- configuration.setProperty("virtualhosts.virtualhost.test.store."+ DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY,
- System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + File.separator + "derbyDB-NoLocalAfterRecoveryTest");
-
- File tmpFile = File.createTempFile("configFile", "test");
- tmpFile.deleteOnExit();
- configuration.save(tmpFile);
-
- _configFile = tmpFile;
- _connectionURL = getConnectionURL();
-
- BrokerDetails details = _connectionURL.getBrokerDetails(0);
-
- // This will attempt to failover for 3 seconds.
- // Local testing suggests failover takes 2 seconds
- details.setProperty(BrokerDetails.OPTIONS_RETRY, "10");
- details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "500");
-
- super.setUp();
- }
public void test() throws Exception
{
+ if(!isBrokerStorePersistent())
+ {
+ fail("This test requires a broker with a persistent store");
+ }
- Connection connection = getConnection(_connectionURL);
+ Connection connection = getConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
- Topic topic = (Topic) getInitialContext().lookup("topic");
+ Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
TopicSubscriber noLocalSubscriber = session.
createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",
@@ -99,88 +61,40 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements Conn
createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-Normal",
null, false);
- List<Message> sent = sendMessage(session, topic, SEND_COUNT);
-
- session.commit();
-
- assertEquals("Incorrect number of messages sent",
- SEND_COUNT, sent.size());
-
+ sendMessage(session, topic, SEND_COUNT);
// Check messages can be received as expected.
connection.start();
- assertTrue("No Local Subscriber is not a no-local subscriber",
- noLocalSubscriber.getNoLocal());
-
- assertFalse("Normal Subscriber is a no-local subscriber",
- normalSubscriber.getNoLocal());
-
-
List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT);
assertEquals("No Local Subscriber Received messages", 0, received.size());
received = receiveMessage(normalSubscriber, SEND_COUNT);
assertEquals("Normal Subscriber Received no messages",
SEND_COUNT, received.size());
+ session.commit();
+ connection.close();
-
- ((AMQConnection)connection).setConnectionListener(this);
-
+ //We didn't receive the messages on the durable queue for the no-local subscriber
+ //so they are still on the broker. Restart the broker, prompting their recovery.
restartBroker();
+ Connection connection2 = getConnection();
+ connection2.start();
- //Await
- if (!_failoverComplete.await(4000L, TimeUnit.MILLISECONDS))
- {
- fail("Failover Failed to compelete");
- }
-
- session.rollback();
-
- //Failover will restablish our clients
- assertTrue("No Local Subscriber is not a no-local subscriber",
- noLocalSubscriber.getNoLocal());
-
- assertFalse("Normal Subscriber is a no-local subscriber",
- normalSubscriber.getNoLocal());
+ Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
+ TopicSubscriber noLocalSubscriber2 = session2.
+ createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",
+ null, true);
- // NOTE : here that the NO-local subscriber actually now gets ALL the
- // messages as the connection has failed and they are consuming on a
- // different connnection to the one that was published on.
- received = receiveMessage(noLocalSubscriber, SEND_COUNT);
+ // The NO-local subscriber should now get ALL the messages
+ // as they are being consumed on a different connection to
+ // the one that they were published on.
+ received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
+ session2.commit();
assertEquals("No Local Subscriber Received messages", SEND_COUNT, received.size());
-
- received = receiveMessage(normalSubscriber, SEND_COUNT);
- assertEquals("Normal Subscriber Received no messages",
- SEND_COUNT, received.size());
-
- //leave the store in a clean state.
- session.commit();
- }
-
- protected List<Message> assertReceiveMessage(MessageConsumer messageConsumer,
- int count) throws JMSException
- {
-
- List<Message> receivedMessages = new ArrayList<Message>(count);
- for (int i = 0; i < count; i++)
- {
- Message received = messageConsumer.receive(1000);
-
- if (received != null)
- {
- receivedMessages.add(received);
- }
- else
- {
- fail("Only "
- + receivedMessages.size() + "/" + count + " received.");
- }
- }
-
- return receivedMessages;
}
protected List<Message> receiveMessage(MessageConsumer messageConsumer,
@@ -204,29 +118,4 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements Conn
return receivedMessages;
}
-
- public void bytesSent(long count)
- {
-
- }
-
- public void bytesReceived(long count)
- {
-
- }
-
- public boolean preFailover(boolean redirect)
- {
- return true;
- }
-
- public boolean preResubscribe()
- {
- return true;
- }
-
- public void failoverComplete()
- {
- _failoverComplete.countDown();
- }
}
diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes
index 08e16ff216..e123d02918 100644
--- a/qpid/java/test-profiles/JavaExcludes
+++ b/qpid/java/test-profiles/JavaExcludes
@@ -28,9 +28,6 @@ org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
//Moved from JavaStandaloneExcludes when it was removed
///////////////////////////////////////////////////////
-//QPID-1818, QPID-1821 : Client code path does not correctly restore a transacted session after failover.
-org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
-
//XA functionality is not fully implemented yet
org.apache.qpid.jms.xa.XAResourceTest#*
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index b4e583ba3a..7cc541c8b9 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -18,6 +18,7 @@
//
//These tests require a persistent store
+org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
org.apache.qpid.server.store.PersistentStoreTest#*
org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod