summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java13
1 files changed, 12 insertions, 1 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 99bf085aa6..4e259f651c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -291,6 +291,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
private final boolean _strictAMQPFATAL;
+ private final Object _messageDeliveryLock = new Object();
/**
* Creates a new session on a connection.
@@ -512,6 +513,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
+ synchronized(_messageDeliveryLock)
+ {
+
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session.
synchronized (_connection.getFailoverMutex())
@@ -558,6 +562,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
}
+ }
}
/**
@@ -567,6 +572,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e) throws JMSException
{
+ synchronized(_messageDeliveryLock)
+ {
synchronized (_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
@@ -585,6 +592,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.deregisterSession(_channelId);
closeProducersAndConsumers(amqe);
}
+ }
}
/**
@@ -2662,7 +2670,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_lock.wait();
}
- dispatchMessage(message);
+ synchronized(_messageDeliveryLock)
+ {
+ dispatchMessage(message);
+ }
while (connectionStopped())
{