From c10241126321f173df7a3c77fb5c049c298029f8 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 17 Mar 2008 17:40:14 +0000 Subject: QPID-849 : Client Deadlock, there are various points where we take the failover mutex to check the _closed values which if it is false then that is all we do. As the _closed check doesn't require the mutex then move all the checks out side of the mutex lock. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637986 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 113 +++++++++++---------- .../java/org/apache/qpid/client/AMQSession.java | 58 ++++++----- 2 files changed, 88 insertions(+), 83 deletions(-) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 2d4d2be8fb..7a54617bf1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -656,71 +656,71 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException { - synchronized(_sessionCreationLock) + synchronized (_sessionCreationLock) { - checkNotClosed(); + checkNotClosed(); - if (channelLimitReached()) - { - throw new ChannelLimitReachedException(_maximumChannelCount); - } + if (channelLimitReached()) + { + throw new ChannelLimitReachedException(_maximumChannelCount); + } - return new FailoverRetrySupport( - new FailoverProtectedOperation() - { - public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException + return new FailoverRetrySupport( + new FailoverProtectedOperation() { - int channelId = _idFactory.incrementAndGet(); - - if (_logger.isDebugEnabled()) + public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException { - _logger.debug("Write channel open frame for channel id " + channelId); - } - - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = - new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, - prefetchLow); - // _protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); + int channelId = _idFactory.incrementAndGet(); - boolean success = false; - try - { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) + if (_logger.isDebugEnabled()) { - deregisterSession(channelId); + _logger.debug("Write channel open frame for channel id " + channelId); } - } - if (_started) - { + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = + new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); + // _protocolHandler.addSessionByChannel(channelId, session); + registerSession(channelId, session); + + boolean success = false; try { - session.start(); + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + success = true; } catch (AMQException e) { - throw new JMSAMQException(e); + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) + { + deregisterSession(channelId); + } } - } - return session; - } - }, this).execute(); + if (_started) + { + try + { + session.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + return session; + } + }, this).execute(); } } @@ -732,13 +732,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); + _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); - BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false); + BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false); // todo send low water mark when protocol allows. // todo Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class); + _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class); if (transacted) { @@ -925,14 +925,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } else - { - synchronized (getFailoverMutex()) { if (!_closed.getAndSet(true)) { - try + + synchronized (getFailoverMutex()) { - long startCloseTime = System.currentTimeMillis(); + try + { + long startCloseTime = System.currentTimeMillis(); closeAllSessions(null, timeout, startCloseTime); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0a51ec7c47..c3219e6564 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -594,14 +594,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); } - synchronized (_connection.getFailoverMutex()) + // Ensure we only try and close an open session. + if (!_closed.getAndSet(true)) { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_messageDeliveryLock) + synchronized (_connection.getFailoverMutex()) { - // Ensure we only try and close an open session. - if (!_closed.getAndSet(true)) + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (_messageDeliveryLock) { // we pass null since this is not an error case closeProducersAndConsumers(null); @@ -655,33 +655,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. // We need to determin here if the connection should be - synchronized (_connection.getFailoverMutex()) + if (e instanceof AMQDisconnectedException) { - if (e instanceof AMQDisconnectedException) + if (_dispatcher != null) { - if (_dispatcher != null) - { - // Failover failed and ain't coming back. Knife the dispatcher. - _dispatcher.interrupt(); - } + // Failover failed and ain't coming back. Knife the dispatcher. + _dispatcher.interrupt(); } - synchronized (_messageDeliveryLock) + } + + if (!_closed.getAndSet(true)) + { + synchronized (_connection.getFailoverMutex()) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - _closed.set(true); - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else + synchronized (_messageDeliveryLock) { - amqe = new AMQException("Closing session forcibly", e); - } + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + AMQException amqe; + if (e instanceof AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); + } } } } -- cgit v1.2.1