summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-07-19 15:59:38 +0000
committerKeith Wall <kwall@apache.org>2012-07-19 15:59:38 +0000
commit6d68dc2b81e20e28b159a3d248099e561a1f21a5 (patch)
tree7bcc34b85401b1baf4baf23e904eb164d2790bd8
parentaa1bb8a8842e7cdf8f3ec18a26014ab251b9f260 (diff)
downloadqpid-python-6d68dc2b81e20e28b159a3d248099e561a1f21a5.tar.gz
QPID-4131: On 0-8...0-9-1 code path broker now closes the connection when the housekeeping thread times out a transaction. AMQChannel now uses AMQProtocolEngine's _receivedLock so that this connection-closing is thread-safe. This gives better compatibility with older clients that do not hand session closes correctly. 0-10 behaviour unaffected by this change.
Merged 1360651 from trunk. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1363403 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java31
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java3
3 files changed, 40 insertions, 21 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index fa949f556f..6479911ea9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -36,6 +36,8 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
@@ -1532,18 +1534,37 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms");
}
- // Close session for idle or open transactions that have timed out
+ // Close _connection_ for idle or open transactions that have timed out (this is different
+ // than the 0-10 code path which closes the session).
if (idleClose > 0L && idleTime > idleClose)
{
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+ closeConnection("Idle transaction timed out");
}
else if (openClose > 0L && openTime > openClose)
{
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+ closeConnection("Open transaction timed out");
}
}
}
+ /**
+ * Typically called from the HouseKeepingThread instead of the main receiver thread,
+ * therefore uses a lock to close the connection in a thread-safe manner.
+ */
+ private void closeConnection(String reason) throws AMQException
+ {
+ Lock receivedLock = _session.getReceivedLock();
+ receivedLock.lock();
+ try
+ {
+ _session.close(AMQConstant.RESOURCE_ERROR, reason);
+ }
+ finally
+ {
+ receivedLock.unlock();
+ }
+ }
+
public void deadLetter(long deliveryTag) throws AMQException
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 7ef5124cc4..ff7dff2473 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -1320,25 +1320,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
- _receivedLock.lock();
- try
- {
- int channelId = ((AMQChannel)session).getChannelId();
- closeChannel(channelId);
+ int channelId = ((AMQChannel)session).getChannelId();
+ closeChannel(channelId);
- MethodRegistry methodRegistry = getMethodRegistry();
- ChannelCloseBody responseBody =
- methodRegistry.createChannelCloseBody(
- cause.getCode(),
- new AMQShortString(message),
- 0,0);
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ChannelCloseBody responseBody =
+ methodRegistry.createChannelCloseBody(
+ cause.getCode(),
+ new AMQShortString(message),
+ 0,0);
- writeFrame(responseBody.generateFrame(channelId));
- }
- finally
- {
- _receivedLock.unlock();
- }
+ writeFrame(responseBody.generateFrame(channelId));
}
public void close(AMQConstant cause, String message) throws AMQException
@@ -1494,4 +1486,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
return _reference;
}
+
+ public Lock getReceivedLock()
+ {
+ return _receivedLock;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 01666ca58b..ba806c04bd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -23,11 +23,11 @@ package org.apache.qpid.server.protocol;
import java.net.SocketAddress;
import java.security.Principal;
import java.util.List;
+import java.util.concurrent.locks.Lock;
import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
-import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -217,4 +217,5 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
public Principal getPeerPrincipal();
+ Lock getReceivedLock();
}