summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-03-17 17:40:14 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-03-17 17:40:14 +0000
commitc10241126321f173df7a3c77fb5c049c298029f8 (patch)
tree335b7f7d2ace08b01620f31ba8c5ac2f20d821e1
parentfb03c42a89ce3d647416bb075d3a750b7b564e05 (diff)
downloadqpid-python-c10241126321f173df7a3c77fb5c049c298029f8.tar.gz
QPID-849 : Client Deadlock, there are various points where we take the failover mutex to check the _closed values which if it is false then that is all we do. As the _closed check doesn't require the mutex then move all the checks out side of the mutex lock.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637986 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java113
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java58
2 files changed, 88 insertions, 83 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 2d4d2be8fb..7a54617bf1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -656,71 +656,71 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException
{
- synchronized(_sessionCreationLock)
+ synchronized (_sessionCreationLock)
{
- checkNotClosed();
+ checkNotClosed();
- if (channelLimitReached())
- {
- throw new ChannelLimitReachedException(_maximumChannelCount);
- }
+ if (channelLimitReached())
+ {
+ throw new ChannelLimitReachedException(_maximumChannelCount);
+ }
- return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
- new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
- {
- public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
+ return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
+ new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
{
- int channelId = _idFactory.incrementAndGet();
-
- if (_logger.isDebugEnabled())
+ public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
{
- _logger.debug("Write channel open frame for channel id " + channelId);
- }
-
- // We must create the session and register it before actually sending the frame to the server to
- // open it, so that there is no window where we could receive data on the channel and not be set
- // up to handle it appropriately.
- AMQSession session =
- new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
- prefetchLow);
- // _protocolHandler.addSessionByChannel(channelId, session);
- registerSession(channelId, session);
+ int channelId = _idFactory.incrementAndGet();
- boolean success = false;
- try
- {
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- success = true;
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error creating session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- finally
- {
- if (!success)
+ if (_logger.isDebugEnabled())
{
- deregisterSession(channelId);
+ _logger.debug("Write channel open frame for channel id " + channelId);
}
- }
- if (_started)
- {
+ // We must create the session and register it before actually sending the frame to the server to
+ // open it, so that there is no window where we could receive data on the channel and not be set
+ // up to handle it appropriately.
+ AMQSession session =
+ new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
+ prefetchLow);
+ // _protocolHandler.addSessionByChannel(channelId, session);
+ registerSession(channelId, session);
+
+ boolean success = false;
try
{
- session.start();
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ success = true;
}
catch (AMQException e)
{
- throw new JMSAMQException(e);
+ JMSException jmse = new JMSException("Error creating session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ finally
+ {
+ if (!success)
+ {
+ deregisterSession(channelId);
+ }
}
- }
- return session;
- }
- }, this).execute();
+ if (_started)
+ {
+ try
+ {
+ session.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ return session;
+ }
+ }, this).execute();
}
}
@@ -732,13 +732,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
+ _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
- BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
// todo send low water mark when protocol allows.
// todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
+ _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
if (transacted)
{
@@ -926,13 +926,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else
{
- synchronized (getFailoverMutex())
- {
if (!_closed.getAndSet(true))
{
- try
+
+ synchronized (getFailoverMutex())
{
- long startCloseTime = System.currentTimeMillis();
+ try
+ {
+ long startCloseTime = System.currentTimeMillis();
closeAllSessions(null, timeout, startCloseTime);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0a51ec7c47..c3219e6564 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -594,14 +594,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
}
- synchronized (_connection.getFailoverMutex())
+ // Ensure we only try and close an open session.
+ if (!_closed.getAndSet(true))
{
- // We must close down all producers and consumers in an orderly fashion. This is the only method
- // that can be called from a different thread of control from the one controlling the session.
- synchronized (_messageDeliveryLock)
+ synchronized (_connection.getFailoverMutex())
{
- // Ensure we only try and close an open session.
- if (!_closed.getAndSet(true))
+ // We must close down all producers and consumers in an orderly fashion. This is the only method
+ // that can be called from a different thread of control from the one controlling the session.
+ synchronized (_messageDeliveryLock)
{
// we pass null since this is not an error case
closeProducersAndConsumers(null);
@@ -655,33 +655,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
// We need to determin here if the connection should be
- synchronized (_connection.getFailoverMutex())
+ if (e instanceof AMQDisconnectedException)
{
- if (e instanceof AMQDisconnectedException)
+ if (_dispatcher != null)
{
- if (_dispatcher != null)
- {
- // Failover failed and ain't coming back. Knife the dispatcher.
- _dispatcher.interrupt();
- }
+ // Failover failed and ain't coming back. Knife the dispatcher.
+ _dispatcher.interrupt();
}
- synchronized (_messageDeliveryLock)
+ }
+
+ if (!_closed.getAndSet(true))
+ {
+ synchronized (_connection.getFailoverMutex())
{
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- _closed.set(true);
- AMQException amqe;
- if (e instanceof AMQException)
- {
- amqe = (AMQException) e;
- }
- else
+ synchronized (_messageDeliveryLock)
{
- amqe = new AMQException("Closing session forcibly", e);
- }
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ AMQException amqe;
+ if (e instanceof AMQException)
+ {
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
+ }
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+
+ _connection.deregisterSession(_channelId);
+ closeProducersAndConsumers(amqe);
+ }
}
}
}