summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-04-14 15:05:10 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-04-14 15:05:10 +0000
commit0f66d4c96c6810d01c6dc7e5c571732430c7aa0e (patch)
tree695454a1c3e0c940d8fb6de1a04e709ffedc488f
parenta2c4c70d6218a9ff70372d55c8528e48f033d542 (diff)
downloadqpid-python-0.5-fix.tar.gz
Merged change to SlowMessageStore to ensure we don't infinitely configure the same class.0.5-fix
QPID-1621: add ServerConfiguration, QueueConfiguration and SecurityConfiguration classes. Move almost all uses of o.a.commons.configuration.Configuration behind there. @Configured delenda est Merged changed from trunk r745799 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5-fix@764816 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/010ExcludeList2
-rw-r--r--qpid/java/08ExcludeList-nonvm4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java55
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java18
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java121
7 files changed, 208 insertions, 13 deletions
diff --git a/qpid/java/010ExcludeList b/qpid/java/010ExcludeList
index c685f2dd6b..d4e9d48fe5 100644
--- a/qpid/java/010ExcludeList
+++ b/qpid/java/010ExcludeList
@@ -59,3 +59,5 @@ org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#*
org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#*
org.apache.qpid.server.queue.PriorityTest#*
org.apache.qpid.server.queue.TimeToLiveTest#*
+// This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM
+org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#*
diff --git a/qpid/java/08ExcludeList-nonvm b/qpid/java/08ExcludeList-nonvm
index eb6c60b225..43e998e6b7 100644
--- a/qpid/java/08ExcludeList-nonvm
+++ b/qpid/java/08ExcludeList-nonvm
@@ -27,3 +27,7 @@ org.apache.qpid.server.security.acl.SimpleACLTest#*
// Those tests are written against the 0.10 path
org.apache.qpid.test.unit.message.UTF8Test#*
+org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait
+
+// This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM
+org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#*
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 269937d0bd..5347e20e96 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -58,6 +58,7 @@ import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -889,7 +890,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (!_closed.getAndSet(true))
{
- doClose(sessions, timeout);
+ _closing.set(true);
+ try{
+ doClose(sessions, timeout);
+ }finally{
+ _closing.set(false);
+ }
}
}
@@ -1283,8 +1289,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// in the case of an IOException, MINA has closed the protocol session so we set _closed to true
// so that any generic client code that tries to close the connection will not mess up this error
// handling sequence
- if (cause instanceof IOException)
+ if (cause instanceof IOException || cause instanceof AMQDisconnectedException)
{
+ // If we have an IOE/AMQDisconnect there is no connection to close on.
+ _closing.set(false);
closer = !_closed.getAndSet(true);
_protocolHandler.getProtocolSession().notifyError(je);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 733bee2d81..3b3d660c9c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import java.io.Serializable;
+import java.io.IOException;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -282,7 +283,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Holds the dispatcher thread for this session. */
protected Dispatcher _dispatcher;
-
+
protected Thread _dispatcherThread;
/** Holds the message factory factory for this session. */
@@ -625,6 +626,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
{
+ _closing.set(true);
synchronized (getFailoverMutex())
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
@@ -636,7 +638,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- sendClose(timeout);
+ // If the connection is open or we are in the process
+ // of closing the connection then send a cance
+ // no point otherwise as the connection will be gone
+ if (!_connection.isClosed() || _connection.isClosing())
+ {
+ sendClose(timeout);
+ }
}
catch (AMQException e)
{
@@ -683,7 +691,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Failover failed and ain't coming back. Knife the dispatcher.
_dispatcherThread.interrupt();
}
- }
+
+ }
+
+ //if we don't have an exception then we can perform closing operations
+ _closing.set(e == null);
if (!_closed.getAndSet(true))
{
@@ -1210,9 +1222,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// this is done so that we can produce to a temporary queue before we create a consumer
result.setQueueName(result.getRoutingKey());
- createQueue(result.getAMQQueueName(), result.isAutoDelete(),
+ createQueue(result.getAMQQueueName(), result.isAutoDelete(),
result.isDurable(), result.isExclusive());
- bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
+ bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
new FieldTable(), result.getExchangeName(), result);
return result;
}
@@ -1674,11 +1686,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// if (rawSelector != null)
// ft.put("headers", rawSelector.getDataAsBytes());
// rawSelector is used by HeadersExchange and is not a JMS Selector
- if (rawSelector != null)
+ if (rawSelector != null)
{
ft.addAll(rawSelector);
}
-
+
if (messageSelector != null)
{
ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
@@ -1918,13 +1930,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_dispatcher = new Dispatcher();
try
{
- _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
-
+ _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
+
}
catch(Exception e)
{
throw new Error("Error creating Dispatcher thread",e);
- }
+ }
_dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
_dispatcherThread.setDaemon(true);
_dispatcher.setConnectionStopped(initiallyStopped);
@@ -2971,4 +2983,27 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
}
+
+ /**
+ * Checks if the Session and its parent connection are closed
+ *
+ * @return <tt>true</tt> if this is closed, <tt>false</tt> otherwise.
+ */
+ @Override
+ public boolean isClosed()
+ {
+ return _closed.get() || _connection.isClosed();
+ }
+
+ /**
+ * Checks if the Session and its parent connection are capable of performing
+ * closing operations
+ *
+ * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise.
+ */
+ @Override
+ public boolean isClosing()
+ {
+ return _closing.get()|| _connection.isClosing();
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 76422c6297..add68c5b27 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -541,6 +541,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
if (!_closed.getAndSet(true))
{
+ _closing.set(true);
if (_logger.isDebugEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
@@ -561,7 +562,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
try
{
- sendCancel();
+ // If the session is open or we are in the process
+ // of closing the session then send a cance
+ // no point otherwise as the connection will be gone
+ if (!_session.isClosed() || _session.isClosing())
+ {
+ sendCancel();
+ }
}
catch (AMQException e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java
index 7e119343a1..e6771e122c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java
@@ -52,6 +52,13 @@ public abstract class Closeable
protected final AtomicBoolean _closed = new AtomicBoolean(false);
/**
+ * Are we in the process of closing. We have this distinction so we can
+ * still signal we are in the process of closing so other objects can tell
+ * the difference and tidy up.
+ */
+ protected final AtomicBoolean _closing = new AtomicBoolean(false);
+
+ /**
* Checks if this is closed, and raises a JMSException if it is.
*
* @throws JMSException If this is closed.
@@ -75,6 +82,17 @@ public abstract class Closeable
}
/**
+ * Checks if this is closis.
+ *
+ * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise.
+ */
+ public boolean isClosing()
+ {
+ return _closing.get();
+ }
+
+
+ /**
* Closes this object.
*
* @throws JMSException If this cannot be closed for any reason.
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java
new file mode 100644
index 0000000000..bb9c22f31a
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+
+public class CloseAfterConnectionFailureTest extends QpidTestCase implements ExceptionListener
+{
+ private int sessionCount = 0;
+ AMQConnection connection;
+ Session session;
+ MessageConsumer consumer;
+ private CountDownLatch _latch = new CountDownLatch(1);
+ private JMSException _fail;
+
+ public void testNoFailover() throws URLSyntaxException, AMQVMBrokerCreationException,
+ InterruptedException, JMSException
+ {
+ String connectionString = "amqp://guest:guest@/test?brokerlist='vm://:1?connectdelay='500',retries='3'',failover='nofailover'";
+
+ AMQConnectionURL url = new AMQConnectionURL(connectionString);
+
+ try
+ {
+ //Start the connection so it will use the retries
+ connection = new AMQConnection(url, null);
+
+ connection.setExceptionListener(this);
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createConsumer(session.createQueue(this.getName()));
+
+ //Kill connection
+ TransportConnection.killAllVMBrokers();
+ _latch.await();
+
+ if (_fail != null)
+ {
+ _fail.printStackTrace(System.out);
+ fail("Exception thrown:" + _fail.getMessage());
+ }
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void onException(JMSException e)
+ {
+ System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+ try
+ {
+ consumer.close();
+ }
+ catch (JMSException jmse)
+ {
+ System.out.println("Consumer close failed with:" + jmse.getMessage());
+ _fail = jmse;
+ }
+ System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+ try
+ {
+ //Note that if we actually do session.close() we will lock up as the session will never receive a frame
+ // from the
+ ((AMQSession) session).close(10);
+ }
+ catch (JMSException jmse)
+ {
+ System.out.println("Session close failed with:" + jmse.getMessage());
+ _fail = jmse;
+ }
+ System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException jmse)
+ {
+ System.out.println("Session close failed with:" + jmse.getMessage());
+ _fail = jmse;
+ }
+ System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+
+ _latch.countDown();
+
+ }
+
+}