diff options
author | Keith Wall <kwall@apache.org> | 2012-07-19 15:54:08 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2012-07-19 15:54:08 +0000 |
commit | aa1bb8a8842e7cdf8f3ec18a26014ab251b9f260 (patch) | |
tree | 47380b2c4fa698d5f53303f7f644a73a6c383745 | |
parent | 532386360228f49deb4eaa9a49cc6ca98f95d4fd (diff) | |
download | qpid-python-aa1bb8a8842e7cdf8f3ec18a26014ab251b9f260.tar.gz |
QPID-4114: broker release now includes BDB if optional=true sys property is set
Applied patch from Phil Harvey <phil@philharveyonline.com>.
Merged 1359595 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1363396 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 31 insertions, 15 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 81d2a49e3c..fa949f556f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -1532,7 +1532,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); } - // Close connection for idle or open transactions that have timed out + // Close session for idle or open transactions that have timed out if (idleClose > 0L && idleTime > idleClose) { getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index cec7ff9625..7ef5124cc4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -34,6 +34,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; @@ -152,8 +155,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private long _lastReceivedTime; private boolean _blocking; + private final Lock _receivedLock; + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId) { + _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(virtualHostRegistry, this); _codecFactory = new AMQCodecFactory(true, this); @@ -225,6 +231,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi final long arrivalTime = System.currentTimeMillis(); _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; + + _receivedLock.lock(); try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); @@ -249,6 +257,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _logger.error("Unexpected exception when processing datablock", e); closeProtocolSession(); } + finally + { + _receivedLock.unlock(); + } } private void receiveComplete() @@ -815,7 +827,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public void closeConnection(int channelId, AMQConnectionException e) throws AMQException + private void closeConnection(int channelId, AMQConnectionException e) throws AMQException { try { @@ -1308,17 +1320,25 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { - int channelId = ((AMQChannel)session).getChannelId(); - closeChannel(channelId); + _receivedLock.lock(); + try + { + int channelId = ((AMQChannel)session).getChannelId(); + closeChannel(channelId); - MethodRegistry methodRegistry = getMethodRegistry(); - ChannelCloseBody responseBody = - methodRegistry.createChannelCloseBody( - cause.getCode(), - new AMQShortString(message), - 0,0); + MethodRegistry methodRegistry = getMethodRegistry(); + ChannelCloseBody responseBody = + methodRegistry.createChannelCloseBody( + cause.getCode(), + new AMQShortString(message), + 0,0); - writeFrame(responseBody.generateFrame(channelId)); + writeFrame(responseBody.generateFrame(channelId)); + } + finally + { + _receivedLock.unlock(); + } } public void close(AMQConstant cause, String message) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index e833069320..01666ca58b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -154,10 +154,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth void closeProtocolSession(); - /** This must be called to close the session in order to free up any resources managed by the session. */ - void closeConnection(int channelId, AMQConnectionException e) throws AMQException; - - /** @return a key that uniquely identifies this session */ Object getKey(); |