diff options
author | Keith Wall <kwall@apache.org> | 2012-07-19 15:59:38 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2012-07-19 15:59:38 +0000 |
commit | 6d68dc2b81e20e28b159a3d248099e561a1f21a5 (patch) | |
tree | 7bcc34b85401b1baf4baf23e904eb164d2790bd8 | |
parent | aa1bb8a8842e7cdf8f3ec18a26014ab251b9f260 (diff) | |
download | qpid-python-6d68dc2b81e20e28b159a3d248099e561a1f21a5.tar.gz |
QPID-4131: On 0-8...0-9-1 code path broker now closes the connection when the housekeeping thread times out a transaction. AMQChannel now uses AMQProtocolEngine's _receivedLock so that this connection-closing is thread-safe. This gives better compatibility with older clients that do not hand session closes correctly. 0-10 behaviour unaffected by this change.
Merged 1360651 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1363403 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 40 insertions, 21 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index fa949f556f..6479911ea9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -36,6 +36,8 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; @@ -1532,18 +1534,37 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); } - // Close session for idle or open transactions that have timed out + // Close _connection_ for idle or open transactions that have timed out (this is different + // than the 0-10 code path which closes the session). if (idleClose > 0L && idleTime > idleClose) { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); + closeConnection("Idle transaction timed out"); } else if (openClose > 0L && openTime > openClose) { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + closeConnection("Open transaction timed out"); } } } + /** + * Typically called from the HouseKeepingThread instead of the main receiver thread, + * therefore uses a lock to close the connection in a thread-safe manner. + */ + private void closeConnection(String reason) throws AMQException + { + Lock receivedLock = _session.getReceivedLock(); + receivedLock.lock(); + try + { + _session.close(AMQConstant.RESOURCE_ERROR, reason); + } + finally + { + receivedLock.unlock(); + } + } + public void deadLetter(long deliveryTag) throws AMQException { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 7ef5124cc4..ff7dff2473 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -1320,25 +1320,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { - _receivedLock.lock(); - try - { - int channelId = ((AMQChannel)session).getChannelId(); - closeChannel(channelId); + int channelId = ((AMQChannel)session).getChannelId(); + closeChannel(channelId); - MethodRegistry methodRegistry = getMethodRegistry(); - ChannelCloseBody responseBody = - methodRegistry.createChannelCloseBody( - cause.getCode(), - new AMQShortString(message), - 0,0); + MethodRegistry methodRegistry = getMethodRegistry(); + ChannelCloseBody responseBody = + methodRegistry.createChannelCloseBody( + cause.getCode(), + new AMQShortString(message), + 0,0); - writeFrame(responseBody.generateFrame(channelId)); - } - finally - { - _receivedLock.unlock(); - } + writeFrame(responseBody.generateFrame(channelId)); } public void close(AMQConstant cause, String message) throws AMQException @@ -1494,4 +1486,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { return _reference; } + + public Lock getReceivedLock() + { + return _receivedLock; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 01666ca58b..ba806c04bd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -23,11 +23,11 @@ package org.apache.qpid.server.protocol; import java.net.SocketAddress; import java.security.Principal; import java.util.List; +import java.util.concurrent.locks.Lock; import javax.security.auth.Subject; import javax.security.sasl.SaslServer; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -217,4 +217,5 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth public Principal getPeerPrincipal(); + Lock getReceivedLock(); } |