From 0f66d4c96c6810d01c6dc7e5c571732430c7aa0e Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 14 Apr 2009 15:05:10 +0000 Subject: Merged change to SlowMessageStore to ensure we don't infinitely configure the same class. QPID-1621: add ServerConfiguration, QueueConfiguration and SecurityConfiguration classes. Move almost all uses of o.a.commons.configuration.Configuration behind there. @Configured delenda est Merged changed from trunk r745799 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5-fix@764816 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/010ExcludeList | 2 + qpid/java/08ExcludeList-nonvm | 4 + .../java/org/apache/qpid/client/AMQConnection.java | 12 +- .../java/org/apache/qpid/client/AMQSession.java | 55 ++++++++-- .../apache/qpid/client/BasicMessageConsumer.java | 9 +- .../java/org/apache/qpid/client/Closeable.java | 18 +++ .../CloseAfterConnectionFailureTest.java | 121 +++++++++++++++++++++ 7 files changed, 208 insertions(+), 13 deletions(-) create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java diff --git a/qpid/java/010ExcludeList b/qpid/java/010ExcludeList index c685f2dd6b..d4e9d48fe5 100644 --- a/qpid/java/010ExcludeList +++ b/qpid/java/010ExcludeList @@ -59,3 +59,5 @@ org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#* org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#* org.apache.qpid.server.queue.PriorityTest#* org.apache.qpid.server.queue.TimeToLiveTest#* +// This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM +org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#* diff --git a/qpid/java/08ExcludeList-nonvm b/qpid/java/08ExcludeList-nonvm index eb6c60b225..43e998e6b7 100644 --- a/qpid/java/08ExcludeList-nonvm +++ b/qpid/java/08ExcludeList-nonvm @@ -27,3 +27,7 @@ org.apache.qpid.server.security.acl.SimpleACLTest#* // Those tests are written against the 0.10 path org.apache.qpid.test.unit.message.UTF8Test#* +org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait + +// This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM +org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#* diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 269937d0bd..5347e20e96 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -58,6 +58,7 @@ import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -889,7 +890,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (!_closed.getAndSet(true)) { - doClose(sessions, timeout); + _closing.set(true); + try{ + doClose(sessions, timeout); + }finally{ + _closing.set(false); + } } } @@ -1283,8 +1289,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // in the case of an IOException, MINA has closed the protocol session so we set _closed to true // so that any generic client code that tries to close the connection will not mess up this error // handling sequence - if (cause instanceof IOException) + if (cause instanceof IOException || cause instanceof AMQDisconnectedException) { + // If we have an IOE/AMQDisconnect there is no connection to close on. + _closing.set(false); closer = !_closed.getAndSet(true); _protocolHandler.getProtocolSession().notifyError(je); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 733bee2d81..3b3d660c9c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.io.Serializable; +import java.io.IOException; import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; @@ -282,7 +283,7 @@ public abstract class AMQSessiontrue if this is closed, false otherwise. + */ + @Override + public boolean isClosed() + { + return _closed.get() || _connection.isClosed(); + } + + /** + * Checks if the Session and its parent connection are capable of performing + * closing operations + * + * @return true if we are closing, false otherwise. + */ + @Override + public boolean isClosing() + { + return _closing.get()|| _connection.isClosing(); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 76422c6297..add68c5b27 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -541,6 +541,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa if (!_closed.getAndSet(true)) { + _closing.set(true); if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); @@ -561,7 +562,13 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { try { - sendCancel(); + // If the session is open or we are in the process + // of closing the session then send a cance + // no point otherwise as the connection will be gone + if (!_session.isClosed() || _session.isClosing()) + { + sendCancel(); + } } catch (AMQException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java index 7e119343a1..e6771e122c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java @@ -51,6 +51,13 @@ public abstract class Closeable */ protected final AtomicBoolean _closed = new AtomicBoolean(false); + /** + * Are we in the process of closing. We have this distinction so we can + * still signal we are in the process of closing so other objects can tell + * the difference and tidy up. + */ + protected final AtomicBoolean _closing = new AtomicBoolean(false); + /** * Checks if this is closed, and raises a JMSException if it is. * @@ -74,6 +81,17 @@ public abstract class Closeable return _closed.get(); } + /** + * Checks if this is closis. + * + * @return true if we are closing, false otherwise. + */ + public boolean isClosing() + { + return _closing.get(); + } + + /** * Closes this object. * diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java new file mode 100644 index 0000000000..bb9c22f31a --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; + +public class CloseAfterConnectionFailureTest extends QpidTestCase implements ExceptionListener +{ + private int sessionCount = 0; + AMQConnection connection; + Session session; + MessageConsumer consumer; + private CountDownLatch _latch = new CountDownLatch(1); + private JMSException _fail; + + public void testNoFailover() throws URLSyntaxException, AMQVMBrokerCreationException, + InterruptedException, JMSException + { + String connectionString = "amqp://guest:guest@/test?brokerlist='vm://:1?connectdelay='500',retries='3'',failover='nofailover'"; + + AMQConnectionURL url = new AMQConnectionURL(connectionString); + + try + { + //Start the connection so it will use the retries + connection = new AMQConnection(url, null); + + connection.setExceptionListener(this); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createConsumer(session.createQueue(this.getName())); + + //Kill connection + TransportConnection.killAllVMBrokers(); + _latch.await(); + + if (_fail != null) + { + _fail.printStackTrace(System.out); + fail("Exception thrown:" + _fail.getMessage()); + } + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void onException(JMSException e) + { + System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + try + { + consumer.close(); + } + catch (JMSException jmse) + { + System.out.println("Consumer close failed with:" + jmse.getMessage()); + _fail = jmse; + } + System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + try + { + //Note that if we actually do session.close() we will lock up as the session will never receive a frame + // from the + ((AMQSession) session).close(10); + } + catch (JMSException jmse) + { + System.out.println("Session close failed with:" + jmse.getMessage()); + _fail = jmse; + } + System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + + try + { + connection.close(); + } + catch (JMSException jmse) + { + System.out.println("Session close failed with:" + jmse.getMessage()); + _fail = jmse; + } + System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + + _latch.countDown(); + + } + +} -- cgit v1.2.1