summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-07-19 15:54:08 +0000
committerKeith Wall <kwall@apache.org>2012-07-19 15:54:08 +0000
commitaa1bb8a8842e7cdf8f3ec18a26014ab251b9f260 (patch)
tree47380b2c4fa698d5f53303f7f644a73a6c383745
parent532386360228f49deb4eaa9a49cc6ca98f95d4fd (diff)
downloadqpid-python-aa1bb8a8842e7cdf8f3ec18a26014ab251b9f260.tar.gz
QPID-4114: broker release now includes BDB if optional=true sys property is set
Applied patch from Phil Harvey <phil@philharveyonline.com>. Merged 1359595 from trunk. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1363396 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java40
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java4
3 files changed, 31 insertions, 15 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 81d2a49e3c..fa949f556f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -1532,7 +1532,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms");
}
- // Close connection for idle or open transactions that have timed out
+ // Close session for idle or open transactions that have timed out
if (idleClose > 0L && idleTime > idleClose)
{
getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index cec7ff9625..7ef5124cc4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -34,6 +34,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
@@ -152,8 +155,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private long _lastReceivedTime;
private boolean _blocking;
+ private final Lock _receivedLock;
+
public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
{
+ _receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_codecFactory = new AMQCodecFactory(true, this);
@@ -225,6 +231,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
final long arrivalTime = System.currentTimeMillis();
_lastReceivedTime = arrivalTime;
_lastIoTime = arrivalTime;
+
+ _receivedLock.lock();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -249,6 +257,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_logger.error("Unexpected exception when processing datablock", e);
closeProtocolSession();
}
+ finally
+ {
+ _receivedLock.unlock();
+ }
}
private void receiveComplete()
@@ -815,7 +827,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
+ private void closeConnection(int channelId, AMQConnectionException e) throws AMQException
{
try
{
@@ -1308,17 +1320,25 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
- int channelId = ((AMQChannel)session).getChannelId();
- closeChannel(channelId);
+ _receivedLock.lock();
+ try
+ {
+ int channelId = ((AMQChannel)session).getChannelId();
+ closeChannel(channelId);
- MethodRegistry methodRegistry = getMethodRegistry();
- ChannelCloseBody responseBody =
- methodRegistry.createChannelCloseBody(
- cause.getCode(),
- new AMQShortString(message),
- 0,0);
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ChannelCloseBody responseBody =
+ methodRegistry.createChannelCloseBody(
+ cause.getCode(),
+ new AMQShortString(message),
+ 0,0);
- writeFrame(responseBody.generateFrame(channelId));
+ writeFrame(responseBody.generateFrame(channelId));
+ }
+ finally
+ {
+ _receivedLock.unlock();
+ }
}
public void close(AMQConstant cause, String message) throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index e833069320..01666ca58b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -154,10 +154,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
void closeProtocolSession();
- /** This must be called to close the session in order to free up any resources managed by the session. */
- void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
-
-
/** @return a key that uniquely identifies this session */
Object getKey();