diff options
author | Martin Ritchie <ritchiem@apache.org> | 2008-03-17 17:40:14 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2008-03-17 17:40:14 +0000 |
commit | c10241126321f173df7a3c77fb5c049c298029f8 (patch) | |
tree | 335b7f7d2ace08b01620f31ba8c5ac2f20d821e1 | |
parent | fb03c42a89ce3d647416bb075d3a750b7b564e05 (diff) | |
download | qpid-python-c10241126321f173df7a3c77fb5c049c298029f8.tar.gz |
QPID-849 : Client Deadlock, there are various points where we take the failover mutex to check the _closed values which if it is false then that is all we do. As the _closed check doesn't require the mutex then move all the checks out side of the mutex lock.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637986 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 113 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 58 |
2 files changed, 88 insertions, 83 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 2d4d2be8fb..7a54617bf1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -656,71 +656,71 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException { - synchronized(_sessionCreationLock) + synchronized (_sessionCreationLock) { - checkNotClosed(); + checkNotClosed(); - if (channelLimitReached()) - { - throw new ChannelLimitReachedException(_maximumChannelCount); - } + if (channelLimitReached()) + { + throw new ChannelLimitReachedException(_maximumChannelCount); + } - return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>( - new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>() - { - public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException + return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>( + new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>() { - int channelId = _idFactory.incrementAndGet(); - - if (_logger.isDebugEnabled()) + public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException { - _logger.debug("Write channel open frame for channel id " + channelId); - } - - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = - new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, - prefetchLow); - // _protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); + int channelId = _idFactory.incrementAndGet(); - boolean success = false; - try - { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) + if (_logger.isDebugEnabled()) { - deregisterSession(channelId); + _logger.debug("Write channel open frame for channel id " + channelId); } - } - if (_started) - { + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = + new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); + // _protocolHandler.addSessionByChannel(channelId, session); + registerSession(channelId, session); + + boolean success = false; try { - session.start(); + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + success = true; } catch (AMQException e) { - throw new JMSAMQException(e); + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) + { + deregisterSession(channelId); + } } - } - return session; - } - }, this).execute(); + if (_started) + { + try + { + session.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + return session; + } + }, this).execute(); } } @@ -732,13 +732,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); + _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); - BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false); + BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false); // todo send low water mark when protocol allows. // todo Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class); + _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class); if (transacted) { @@ -926,13 +926,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - synchronized (getFailoverMutex()) - { if (!_closed.getAndSet(true)) { - try + + synchronized (getFailoverMutex()) { - long startCloseTime = System.currentTimeMillis(); + try + { + long startCloseTime = System.currentTimeMillis(); closeAllSessions(null, timeout, startCloseTime); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0a51ec7c47..c3219e6564 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -594,14 +594,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); } - synchronized (_connection.getFailoverMutex()) + // Ensure we only try and close an open session. + if (!_closed.getAndSet(true)) { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_messageDeliveryLock) + synchronized (_connection.getFailoverMutex()) { - // Ensure we only try and close an open session. - if (!_closed.getAndSet(true)) + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (_messageDeliveryLock) { // we pass null since this is not an error case closeProducersAndConsumers(null); @@ -655,33 +655,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. // We need to determin here if the connection should be - synchronized (_connection.getFailoverMutex()) + if (e instanceof AMQDisconnectedException) { - if (e instanceof AMQDisconnectedException) + if (_dispatcher != null) { - if (_dispatcher != null) - { - // Failover failed and ain't coming back. Knife the dispatcher. - _dispatcher.interrupt(); - } + // Failover failed and ain't coming back. Knife the dispatcher. + _dispatcher.interrupt(); } - synchronized (_messageDeliveryLock) + } + + if (!_closed.getAndSet(true)) + { + synchronized (_connection.getFailoverMutex()) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - _closed.set(true); - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else + synchronized (_messageDeliveryLock) { - amqe = new AMQException("Closing session forcibly", e); - } + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + AMQException amqe; + if (e instanceof AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); + } } } } |