summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java80
1 files changed, 33 insertions, 47 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 6b3b4601d9..d9298abd0f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -81,7 +81,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
- private static final long DEFAULT_CLOSE_TIMEOUT = 2000l;
+ private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT,
+ ClientProperties.DEFAULT_CLOSE_TIMEOUT);
private final long _connectionNumber;
@@ -880,44 +881,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- long startCloseTime = System.currentTimeMillis();
-
- closeAllSessions(null, timeout, startCloseTime);
-
- //This MUST occur after we have successfully closed all Channels/Sessions
- _taskPool.shutdown();
-
- if (!_taskPool.isTerminated())
+ try
{
- try
- {
- // adjust timeout
- long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
-
- _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- _logger.info("Interrupted while shutting down connection thread pool.");
- }
+ closeAllSessions(null, timeout);
}
-
- // adjust timeout
- timeout = adjustTimeout(timeout, startCloseTime);
- //If the taskpool hasn't shutdown by now then give it shutdownNow.
- // This will interupt any running tasks.
- if (!_taskPool.isTerminated())
+ finally
{
- List<Runnable> tasks = _taskPool.shutdownNow();
- for (Runnable r : tasks)
- {
- _logger.warn("Connection close forced taskpool to prevent execution:" + r);
- }
+ //This MUST occur after we have successfully closed all Channels/Sessions
+ shutdownTaskPool(timeout);
}
}
catch (JMSException e)
{
- _logger.error("error:", e);
+ _logger.error("Error closing connection", e);
JMSException jmse = new JMSException("Error closing connection: " + e);
jmse.setLinkedException(e);
jmse.initCause(e);
@@ -939,16 +915,32 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- private long adjustTimeout(long timeout, long startTime)
+ private void shutdownTaskPool(final long timeout)
{
- long now = System.currentTimeMillis();
- timeout -= now - startTime;
- if (timeout < 0)
+ _taskPool.shutdown();
+
+ if (!_taskPool.isTerminated())
{
- timeout = 0;
+ try
+ {
+ _taskPool.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interrupted while shutting down connection thread pool.");
+ }
}
- return timeout;
+ //If the taskpool hasn't shutdown by now then give it shutdownNow.
+ // This will interrupt any running tasks.
+ if (!_taskPool.isTerminated())
+ {
+ List<Runnable> tasks = _taskPool.shutdownNow();
+ for (Runnable r : tasks)
+ {
+ _logger.warn("Connection close forced taskpool to prevent execution:" + r);
+ }
+ }
}
/**
@@ -976,7 +968,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
* before calling this method.
*/
- private void closeAllSessions(Throwable cause, long timeout, long starttime) throws JMSException
+ private void closeAllSessions(Throwable cause, long timeout) throws JMSException
{
final LinkedList sessionCopy = new LinkedList(_sessions.values());
final Iterator it = sessionCopy.iterator();
@@ -992,11 +984,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- if (starttime != -1)
- {
- timeout = adjustTimeout(timeout, starttime);
- }
-
session.close(timeout);
}
catch (JMSException e)
@@ -1042,7 +1029,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
- // TODO Auto-generated method stub
checkNotClosed();
throw new JmsNotImplementedException();
@@ -1322,7 +1308,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
}
catch (JMSException e)
{
@@ -1444,7 +1430,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQShortString getTemporaryQueueExchangeName()
{
- return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates.
+ return _temporaryQueueExchangeName;
}
public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)