From 5e8136af6e36d5f2689dd07e70095546c0120dbc Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 17 Oct 2014 13:58:04 +0000 Subject: merged from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1632579 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 80 +++++++++------------- 1 file changed, 33 insertions(+), 47 deletions(-) (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQConnection.java') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6b3b4601d9..d9298abd0f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -81,7 +81,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong(); - private static final long DEFAULT_CLOSE_TIMEOUT = 2000l; + private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT, + ClientProperties.DEFAULT_CLOSE_TIMEOUT); private final long _connectionNumber; @@ -880,44 +881,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - long startCloseTime = System.currentTimeMillis(); - - closeAllSessions(null, timeout, startCloseTime); - - //This MUST occur after we have successfully closed all Channels/Sessions - _taskPool.shutdown(); - - if (!_taskPool.isTerminated()) + try { - try - { - // adjust timeout - long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); - - _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - _logger.info("Interrupted while shutting down connection thread pool."); - } + closeAllSessions(null, timeout); } - - // adjust timeout - timeout = adjustTimeout(timeout, startCloseTime); - //If the taskpool hasn't shutdown by now then give it shutdownNow. - // This will interupt any running tasks. - if (!_taskPool.isTerminated()) + finally { - List tasks = _taskPool.shutdownNow(); - for (Runnable r : tasks) - { - _logger.warn("Connection close forced taskpool to prevent execution:" + r); - } + //This MUST occur after we have successfully closed all Channels/Sessions + shutdownTaskPool(timeout); } } catch (JMSException e) { - _logger.error("error:", e); + _logger.error("Error closing connection", e); JMSException jmse = new JMSException("Error closing connection: " + e); jmse.setLinkedException(e); jmse.initCause(e); @@ -939,16 +915,32 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - private long adjustTimeout(long timeout, long startTime) + private void shutdownTaskPool(final long timeout) { - long now = System.currentTimeMillis(); - timeout -= now - startTime; - if (timeout < 0) + _taskPool.shutdown(); + + if (!_taskPool.isTerminated()) { - timeout = 0; + try + { + _taskPool.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + _logger.info("Interrupted while shutting down connection thread pool."); + } } - return timeout; + //If the taskpool hasn't shutdown by now then give it shutdownNow. + // This will interrupt any running tasks. + if (!_taskPool.isTerminated()) + { + List tasks = _taskPool.shutdownNow(); + for (Runnable r : tasks) + { + _logger.warn("Connection close forced taskpool to prevent execution:" + r); + } + } } /** @@ -976,7 +968,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @param cause if not null, the error that is causing this shutdown

The caller must hold the failover mutex * before calling this method. */ - private void closeAllSessions(Throwable cause, long timeout, long starttime) throws JMSException + private void closeAllSessions(Throwable cause, long timeout) throws JMSException { final LinkedList sessionCopy = new LinkedList(_sessions.values()); final Iterator it = sessionCopy.iterator(); @@ -992,11 +984,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - if (starttime != -1) - { - timeout = adjustTimeout(timeout, starttime); - } - session.close(timeout); } catch (JMSException e) @@ -1042,7 +1029,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { - // TODO Auto-generated method stub checkNotClosed(); throw new JmsNotImplementedException(); @@ -1322,7 +1308,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. } catch (JMSException e) { @@ -1444,7 +1430,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQShortString getTemporaryQueueExchangeName() { - return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates. + return _temporaryQueueExchangeName; } public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) -- cgit v1.2.1