diff options
author | Robert Gemmell <robbie@apache.org> | 2012-02-13 00:30:34 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-02-13 00:30:34 +0000 |
commit | f3fb8a56864f89941fb527c8855018ce470143e1 (patch) | |
tree | 1dbb0ff9d370f847174c008b800d3b106e370c91 | |
parent | a6cc4cb2fba4dbc260b2a8b58512f7c7a24ba5be (diff) | |
download | qpid-python-f3fb8a56864f89941fb527c8855018ce470143e1.tar.gz |
QPID-3829: use a seperate object for reference checking to stop the AMQMessage holding its underlying 0-8/0-9/0-9-1 connection/io objects in memory after they are closed. Also stops an NPE on the 0-8/0-9/0-9-1 subscriptions when evaluating no-local after store recovery.
Enables NoLocalAfterRecoveryTest again, though updated to make it simpler and more reliable. This test should be removed if changes for QPID-3605 are undertaken.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1243379 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 49 insertions, 175 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 2266de0ae4..e5e755bd23 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -1108,7 +1108,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage()); message.setExpiration(incomingMessage.getExpiration()); - message.setClientIdentifier(_session); + message.setConnectionIdentifier(_session.getReference()); return message; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java index 583e7d177f..6a0e4d216e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java @@ -58,7 +58,7 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> private final long _size; - private Object _sessionIdentifier; + private Object _connectionIdentifier; private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); public AMQMessage(StoredMessage<MessageMetaData> handle) @@ -218,19 +218,15 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> } - public Object getPublisherIdentifier() + public Object getConnectionIdentifier() { - //todo store sessionIdentifier/client id with message in store - //Currently the _sessionIdentifier will be null if the message has been - // restored from a message Store - - return _sessionIdentifier; + return _connectionIdentifier; } - public void setClientIdentifier(final Object sessionIdentifier) + public void setConnectionIdentifier(final Object connectionIdentifier) { - _sessionIdentifier = sessionIdentifier; + _connectionIdentifier = connectionIdentifier; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 1f59091eba..f6980be525 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -132,7 +132,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private Subject _authorizedSubject; private MethodDispatcher _dispatcher; - private final long _sessionID; + private final long _connectionID; + private Object _reference = new Object(); private AMQPConnectionActor _actor; private LogSubject _logSubject; @@ -170,7 +171,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _codecFactory = new AMQCodecFactory(true, this); setNetworkConnection(network); - _sessionID = connectionId; + _connectionID = connectionId; _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); @@ -203,7 +204,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public long getSessionID() { - return _sessionID; + return _connectionID; } public LogActor getLogActor() @@ -969,11 +970,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return getMethodRegistry(); } - public Object getClientIdentifier() - { - return _network.getRemoteAddress(); - } - public VirtualHost getVirtualHost() { return _virtualHost; @@ -1464,4 +1460,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } + public Object getReference() + { + return _reference; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index b68f6097e0..6cd5b21f89 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -172,7 +172,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth void setClientProperties(FieldTable clientProperties); - Object getClientIdentifier(); + Object getReference(); VirtualHost getVirtualHost(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 0f6bc976de..32baa17fc7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -475,10 +475,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean hasInterest(QueueEntry entry) { - - - - //check that the message hasn't been rejected if (entry.isRejectedBy(getSubscriptionID())) { @@ -490,22 +486,17 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage if (_noLocal) { - AMQMessage message = (AMQMessage) entry.getMessage(); - //todo - client id should be recorded so we don't have to handle - // the case where this is null. - final Object publisher = message.getPublisherIdentifier(); + final Object publisherReference = message.getConnectionIdentifier(); // We don't want local messages so check to see if message is one we sent - Object localInstance = getProtocolSession(); + Object localReference = getProtocolSession().getReference(); - if(publisher.equals(localInstance)) + if(publisherReference != null && publisherReference.equals(localReference)) { return false; } - - } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java index a95d07ec45..2e259191aa 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java @@ -20,14 +20,8 @@ */ package org.apache.qpid.server.persistent; -import org.apache.commons.configuration.XMLConfiguration; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.server.store.DerbyMessageStore; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.ArrayList; +import java.util.List; import javax.jms.Connection; import javax.jms.JMSException; @@ -36,60 +30,28 @@ import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; /** - * QPID-1813 : We do not store the client id with a message so on store restart - * that information is lost and we are unable to perform no local checks. - * - * QPID-1813 highlights the lack of testing here as the broker will NPE as it - * assumes that the client id of the publisher will always exist + * Verifies that after recovery, a new Connection with no-local in use is + * able to receive messages sent prior to the broker restart. */ -public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements ConnectionListener +public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase { protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName(); protected static final int SEND_COUNT = 10; - private CountDownLatch _failoverComplete = new CountDownLatch(1); - - protected ConnectionURL _connectionURL; - - @Override - protected void setUp() throws Exception - { - - XMLConfiguration configuration = new XMLConfiguration(_configFile); - configuration.setProperty("virtualhosts.virtualhost.test.store.class", "org.apache.qpid.server.store.DerbyMessageStore"); - configuration.setProperty("virtualhosts.virtualhost.test.store."+ DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY, - System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + File.separator + "derbyDB-NoLocalAfterRecoveryTest"); - - File tmpFile = File.createTempFile("configFile", "test"); - tmpFile.deleteOnExit(); - configuration.save(tmpFile); - - _configFile = tmpFile; - _connectionURL = getConnectionURL(); - - BrokerDetails details = _connectionURL.getBrokerDetails(0); - - // This will attempt to failover for 3 seconds. - // Local testing suggests failover takes 2 seconds - details.setProperty(BrokerDetails.OPTIONS_RETRY, "10"); - details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "500"); - - super.setUp(); - } public void test() throws Exception { + if(!isBrokerStorePersistent()) + { + fail("This test requires a broker with a persistent store"); + } - Connection connection = getConnection(_connectionURL); + Connection connection = getConnection(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - - Topic topic = (Topic) getInitialContext().lookup("topic"); + Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME); TopicSubscriber noLocalSubscriber = session. createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", @@ -99,88 +61,40 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements Conn createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-Normal", null, false); - List<Message> sent = sendMessage(session, topic, SEND_COUNT); - - session.commit(); - - assertEquals("Incorrect number of messages sent", - SEND_COUNT, sent.size()); - + sendMessage(session, topic, SEND_COUNT); // Check messages can be received as expected. connection.start(); - assertTrue("No Local Subscriber is not a no-local subscriber", - noLocalSubscriber.getNoLocal()); - - assertFalse("Normal Subscriber is a no-local subscriber", - normalSubscriber.getNoLocal()); - - List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT); assertEquals("No Local Subscriber Received messages", 0, received.size()); received = receiveMessage(normalSubscriber, SEND_COUNT); assertEquals("Normal Subscriber Received no messages", SEND_COUNT, received.size()); + session.commit(); + connection.close(); - - ((AMQConnection)connection).setConnectionListener(this); - + //We didn't receive the messages on the durable queue for the no-local subscriber + //so they are still on the broker. Restart the broker, prompting their recovery. restartBroker(); + Connection connection2 = getConnection(); + connection2.start(); - //Await - if (!_failoverComplete.await(4000L, TimeUnit.MILLISECONDS)) - { - fail("Failover Failed to compelete"); - } - - session.rollback(); - - //Failover will restablish our clients - assertTrue("No Local Subscriber is not a no-local subscriber", - noLocalSubscriber.getNoLocal()); - - assertFalse("Normal Subscriber is a no-local subscriber", - normalSubscriber.getNoLocal()); + Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED); + Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME); + TopicSubscriber noLocalSubscriber2 = session2. + createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", + null, true); - // NOTE : here that the NO-local subscriber actually now gets ALL the - // messages as the connection has failed and they are consuming on a - // different connnection to the one that was published on. - received = receiveMessage(noLocalSubscriber, SEND_COUNT); + // The NO-local subscriber should now get ALL the messages + // as they are being consumed on a different connection to + // the one that they were published on. + received = receiveMessage(noLocalSubscriber2, SEND_COUNT); + session2.commit(); assertEquals("No Local Subscriber Received messages", SEND_COUNT, received.size()); - - received = receiveMessage(normalSubscriber, SEND_COUNT); - assertEquals("Normal Subscriber Received no messages", - SEND_COUNT, received.size()); - - //leave the store in a clean state. - session.commit(); - } - - protected List<Message> assertReceiveMessage(MessageConsumer messageConsumer, - int count) throws JMSException - { - - List<Message> receivedMessages = new ArrayList<Message>(count); - for (int i = 0; i < count; i++) - { - Message received = messageConsumer.receive(1000); - - if (received != null) - { - receivedMessages.add(received); - } - else - { - fail("Only " - + receivedMessages.size() + "/" + count + " received."); - } - } - - return receivedMessages; } protected List<Message> receiveMessage(MessageConsumer messageConsumer, @@ -204,29 +118,4 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements Conn return receivedMessages; } - - public void bytesSent(long count) - { - - } - - public void bytesReceived(long count) - { - - } - - public boolean preFailover(boolean redirect) - { - return true; - } - - public boolean preResubscribe() - { - return true; - } - - public void failoverComplete() - { - _failoverComplete.countDown(); - } } diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes index 08e16ff216..e123d02918 100644 --- a/qpid/java/test-profiles/JavaExcludes +++ b/qpid/java/test-profiles/JavaExcludes @@ -28,9 +28,6 @@ org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy //Moved from JavaStandaloneExcludes when it was removed /////////////////////////////////////////////////////// -//QPID-1818, QPID-1821 : Client code path does not correctly restore a transacted session after failover. -org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* - //XA functionality is not fully implemented yet org.apache.qpid.jms.xa.XAResourceTest#* diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index b4e583ba3a..7cc541c8b9 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -18,6 +18,7 @@ // //These tests require a persistent store +org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* org.apache.qpid.server.store.PersistentStoreTest#* org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod |